Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test__init_http_client(mock_trs_config):
mock_opts = mock_trs_config['mock_trs']
test_http_client = _init_http_client(opts=mock_opts)
assert isinstance(test_http_client, RequestsClient)
assert test_http_client.authenticator.host == mock_opts['host']
assert test_http_client.authenticator.api_key == mock_opts['auth']
def make_swagger_spec(self):
http_client = RequestsClient()
# if the caller provided an existing requests session,
# then use there here.
if self.session:
http_client.session = self.session
return Spec.from_dict(
spec_dict=self.origin_spec,
origin_url=self.server_url,
http_client=http_client,
# TODO expose these configuration options to the
# TODO caller; hardcoded for now. caller could make
# TODO updates to the self.swagger_spec value
def construct_swagger_clients(hostname, token):
http_client = RequestsClient()
http_client.session.verify = False
http_client.session.headers = {
'Authorization': 'Bearer %s' % token
}
specs_client = SwaggerClient.from_url(hostname + SWAGGER_SPECS_PREFIX, http_client=http_client,
config={'validate_responses': False, 'also_return_response': False})
docs_client = SwaggerDocsClient.from_url(hostname + SWAGGER_DOCS_PREFIX, http_client=http_client)
return specs_client, docs_client
is_secure_transport,
InsecureTransportError
)
import requests
from requests_oauthlib import (
OAuth2Session,
TokenUpdated
)
from .logger import LOG
from .exceptions import AetherAPIException
# don't force https for oauth requests
os.environ['OAUTHLIB_INSECURE_TRANSPORT'] = '1'
class OauthClient(RequestsClient):
"""Synchronous HTTP client implementation.
"""
def __init__(self, authenticator, ssl_verify=True, ssl_cert=None):
"""
:param ssl_verify: Set to False to disable SSL certificate validation.
Provide the path to a CA bundle if you need to use a custom one.
:param ssl_cert: Provide a client-side certificate to use. Either a
sequence of strings pointing to the certificate (1) and the private
key (2), or a string pointing to the combined certificate and key.
"""
self.session = authenticator.session
self.authenticator = authenticator
self.ssl_verify = ssl_verify
self.ssl_cert = ssl_cert
"""This module provides the views for the portal."""
import os
from base64 import b64decode
from django.shortcuts import render, redirect
from django.template import RequestContext
try:
from django.core.urlresolvers import reverse
except ImportError:
from django.urls import reverse
from bravado.requests_client import RequestsClient
from bravado.client import SwaggerClient
from bravado.swagger_model import load_file
http_client = RequestsClient()
http_client.session.verify = False # TODO remove when SSL enabled
spec_dict = load_file('apispec.json', http_client=http_client)
def index(request):
ctx = RequestContext(request)
ctx['email'] = request.META.get('HTTP_X_CONSUMER_USERNAME')
api_key = request.META.get('HTTP_X_CONSUMER_CUSTOM_ID')
if api_key and ctx['email']:
http_client.set_api_key(
os.environ['MPCONTRIBS_API_HOST'], b64decode(api_key),
param_in='header', param_name='x-api-key'
)
client = SwaggerClient.from_spec(
spec_dict, http_client=http_client,
config={'validate_responses': False}
)
def from_url(cls, spec_url, http_client=None):
http_client = http_client or RequestsClient()
loader = Loader(http_client)
swagger_docs = loader.load_spec(spec_url)
return cls(swagger_docs)
# sometimes we want the status code
requests_client = PaastaRequestsClient(
scheme=parsed.scheme, cluster=cluster, system_paasta_config=system_paasta_config
)
if http_res:
config = {"also_return_response": True}
c = SwaggerClient.from_spec(
spec_dict=spec_dict, config=config, http_client=requests_client
)
else:
c = SwaggerClient.from_spec(spec_dict=spec_dict, http_client=requests_client)
return paasta_tools.api.auth_decorator.AuthClientDecorator(c, cluster_name=cluster)
class PaastaRequestsClient(RequestsClient):
def __init__(
self, scheme: str, cluster: str, system_paasta_config: SystemPaastaConfig
) -> None:
if scheme == "https":
ca_cert_path = None
if system_paasta_config.get_enable_client_cert_auth():
ecosystem = system_paasta_config.get_vault_cluster_config()[cluster]
paasta_dir = os.path.expanduser("~/.paasta/pki")
if (
not os.path.isfile(f"{paasta_dir}/{ecosystem}.crt")
or not os.path.isfile(f"{paasta_dir}/{ecosystem}.key")
or not os.path.isfile(f"{paasta_dir}/{ecosystem}_ca.crt")
):
renew_issue_cert(
system_paasta_config=system_paasta_config, cluster=cluster
)
def __init__(self, api_token=None, proxies=None):
self.credentials = Credentials(api_token)
ssl_verify = True
if os.getenv("NEPTUNE_ALLOW_SELF_SIGNED_CERTIFICATE"):
urllib3.disable_warnings()
ssl_verify = False
self._http_client = RequestsClient(ssl_verify=ssl_verify)
update_session_proxies(self._http_client.session, proxies)
config_api_url = self.credentials.api_url_opt or self.credentials.token_origin_address
self._verify_host_resolution(config_api_url, self.credentials.token_origin_address)
backend_client = self._get_swagger_client('{}/api/backend/swagger.json'.format(config_api_url))
self._client_config = self._create_client_config(self.credentials.api_token, backend_client)
self._set_swagger_clients(self._client_config, config_api_url, backend_client)
self.authenticator = self._create_authenticator(self.credentials.api_token, ssl_verify, proxies)
self._http_client.authenticator = self.authenticator
# This is not a top-level import because of circular dependencies
from neptune import __version__
self.client_lib_version = __version__
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import os
import requests
from bravado.requests_client import Authenticator, RequestsClient
from requests.auth import HTTPBasicAuth
from urllib.parse import urlparse
class BasicRealmClient(RequestsClient):
"""Synchronous HTTP client implementation.
"""
def __init__(self, authenticator, ssl_verify=True, ssl_cert=None):
"""
:param ssl_verify: Set to False to disable SSL certificate validation.
Provide the path to a CA bundle if you need to use a custom one.
:param ssl_cert: Provide a client-side certificate to use. Either a
sequence of strings pointing to the certificate (1) and the private
key (2), or a string pointing to the combined certificate and key.
"""
self.session = requests.Session()
self.authenticator = authenticator
self.ssl_verify = ssl_verify
self.ssl_cert = ssl_cert
print('\n---A basic Trade GET:---')
pp.pprint(res)
print('\n---Response details:---')
print("Status Code: %d, headers: %s" % (http_response.status_code, http_response.headers))
#
# Authenticated calls
#
# To do authentication, you must generate an API key.
# Do so at https://testnet.bitmex.com/app/apiKeys
API_KEY = ''
API_SECRET = ''
request_client = RequestsClient()
request_client.authenticator = APIKeyAuthenticator(HOST, API_KEY, API_SECRET)
bitMEXAuthenticated = SwaggerClient.from_url(
SPEC_URI,
config=config,
http_client=request_client)
# Basic authenticated call
print('\n---A basic Position GET:---')
print('The following call requires an API key. If one is not set, it will throw an Unauthorized error.')
res, http_response = bitMEXAuthenticated.Position.Position_get(filter=json.dumps({'symbol': 'XBTUSD'})).result()
pp.pprint(res)