Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
try:
from urllib.parse import urlparse
except ImportError:
from urlparse import urlparse
import time, hashlib, hmac
from bravado.client import SwaggerClient
from bravado.requests_client import RequestsClient, Authenticator
from bravado.swagger_model import Loader
# swagger spec's formats to exclude. this help to avoid warning in your console.
EXCLUDE_SWG_FORMATS = ['JSON', 'guid']
class APIKeyAuthenticator(Authenticator):
def __init__(self, host, api_key, api_secret):
super(APIKeyAuthenticator, self).__init__(host)
self.api_key = api_key
self.api_secret = api_secret
def matches(self, url):
if 'swagger.json' in url:
return False
return True
def apply(self, r):
# 5s grace period in case of clock skew
expires = int(time.time() * 1000)
r.headers['api-expires'] = str(expires)
r.headers['api-key'] = self.api_key
"""
split = urlparse.urlsplit(url)
return self.host == split.hostname
def apply(self, request):
# type: (requests.Request) -> requests.Request
"""Apply authentication to a request.
:param request: Request to add authentication information to.
"""
raise NotImplementedError(u"%s: Method not implemented",
self.__class__.__name__)
# noinspection PyDocstring
class ApiKeyAuthenticator(Authenticator):
"""?api_key authenticator.
This authenticator adds an API key via query parameter or header.
:param host: Host to authenticate for.
:param api_key: API key.
:param param_name: Query parameter specifying the API key.
:param param_in: How to send the API key. Can be 'query' or 'header'.
"""
def __init__(
self,
host, # type: str
api_key, # type: typing.Text
param_name=u'api_key', # type: typing.Text
param_in=u'query', # type: typing.Text
# type: (...) -> None
super(ApiKeyAuthenticator, self).__init__(host)
self.param_name = param_name
self.param_in = param_in
self.api_key = api_key
def apply(self, request):
# type: (requests.Request) -> requests.Request
if self.param_in == 'header':
request.headers.setdefault(self.param_name, self.api_key)
else:
request.params[self.param_name] = self.api_key
return request
class BasicAuthenticator(Authenticator):
"""HTTP Basic authenticator.
:param host: Host to authenticate for.
:param username: Username.
:param password: Password
"""
def __init__(
self,
host, # type: str
username, # type: typing.Union[bytes, str]
password, # type: typing.Union[bytes, str]
):
# type: (...) -> None
super(BasicAuthenticator, self).__init__(host)
self.auth = requests.auth.HTTPBasicAuth(username, password)
headers=r.headers)
return r
@with_api_exceptions_handler
def refresh_token_if_needed(self):
if self.token_expires_at - time.time() < 30:
self._refresh_token()
def _refresh_token(self):
self.session.refresh_token(self.session.auto_refresh_url)
if self.session.token is not None and self.session.token.get('access_token') is not None:
decoded_json_token = jwt.decode(self.session.token.get('access_token'), verify=False)
self.token_expires_at = decoded_json_token.get(u'exp')
class NeptuneAuthenticator(Authenticator):
def __init__(self, auth_tokens, ssl_verify, proxies):
super(NeptuneAuthenticator, self).__init__(host='')
decoded_json_token = jwt.decode(auth_tokens.accessToken, verify=False)
expires_at = decoded_json_token.get(u'exp')
client_name = decoded_json_token.get(u'azp')
refresh_url = u'{realm_url}/protocol/openid-connect/token'.format(realm_url=decoded_json_token.get(u'iss'))
token = {
u'access_token': auth_tokens.accessToken,
u'refresh_token': auth_tokens.refreshToken,
u'expires_in': expires_at - time.time()
}
session = OAuth2Session(
client_id=client_name,
token=token,
auto_refresh_url=refresh_url,
import urllib.parse
import time
import hashlib
import hmac
from bravado.requests_client import Authenticator
class APIKeyAuthenticator(Authenticator):
"""?api_key authenticator.
This authenticator adds BitMEX API key support via header.
:param host: Host to authenticate for.
:param api_key: API key.
:param api_secret: API secret.
"""
def __init__(self, host, api_key, api_secret):
super(APIKeyAuthenticator, self).__init__(host)
self.api_key = api_key
self.api_secret = api_secret
# Forces this to apply to all requests.
def matches(self, url):
if "swagger.json" in url:
return False
def __init__(self, host, api_key, param_name=u'api_key', param_in=u'query'):
super(ApiKeyAuthenticator, self).__init__(host)
self.param_name = param_name
self.param_in = param_in
self.api_key = api_key
def apply(self, request):
if self.param_in == 'header':
request.headers[self.param_name] = self.api_key
else:
request.params[self.param_name] = self.api_key
return request
class BasicAuthenticator(Authenticator):
"""HTTP Basic authenticator.
:param host: Host to authenticate for.
:param username: Username.
:param password: Password
"""
def __init__(self, host, username, password):
super(BasicAuthenticator, self).__init__(host)
self.auth = requests.auth.HTTPBasicAuth(username, password)
def apply(self, request):
request.auth = self.auth
return request
import datetime
import urllib
from bravado.client import SwaggerClient
from bravado.requests_client import RequestsClient
from bravado.requests_client import Authenticator
#from BitMEXAPIKeyAuthenticator import APIKeyAuthenticator
#from offcial-http BitMEXAPIKeyAuthenticator import APIKeyAuthenticator
import json
import pprint
def local2utc(local_st):
time_struct = time.mktime(local_st.timetuple())
utc_st = datetime.datetime.utcfromtimestamp(time_struct)
return utc_st
class APIKeyAuthenticator(Authenticator):
"""?api_key authenticator.
This authenticator adds BitMEX API key support via header.
:param host: Host to authenticate for.
:param api_key: API key.
:param api_secret: API secret.
"""
def __init__(self, host, api_key, api_secret):
super(APIKeyAuthenticator, self).__init__(host)
self.api_key = api_key
self.api_secret = api_secret
# Forces this to apply to all requests.
def matches(self, url):
if "swagger.json" in url:
return False
import urllib.parse
import time
import hashlib
import hmac
from bravado.requests_client import Authenticator
class APIKeyAuthenticator(Authenticator):
"""?api_key authenticator.
This authenticator adds Bybit API key support via header.
:param host: Host to authenticate for.
:param api_key: API key.
:param api_secret: API secret.
"""
def __init__(self, host, api_key, api_secret):
super(APIKeyAuthenticator, self).__init__(host)
self.api_key = api_key
self.api_secret = api_secret
# Forces this to apply to all requests.
def matches(self, url):
if "swagger.json" in url:
return False
try:
return self._add_token(r)
except TokenExpiredError:
self.session.refresh_token(self.session.auto_refresh_url)
return self._add_token(r)
def _add_token(self, r):
# pylint: disable=protected-access
r.url, r.headers, r.body = self.session._client.add_token(r.url,
http_method=r.method,
body=r.body,
headers=r.headers)
return r
class NeptuneAuthenticator(Authenticator):
def __init__(self, auth_tokens):
super(NeptuneAuthenticator, self).__init__(host='')
decoded_json_token = jwt.decode(auth_tokens.accessToken, verify=False)
expires_at = decoded_json_token.get(u'exp')
client_name = decoded_json_token.get(u'azp')
refresh_url = u'{realm_url}/protocol/openid-connect/token'.format(realm_url=decoded_json_token.get(u'iss'))
token = {
u'access_token': auth_tokens.accessToken,
u'refresh_token': auth_tokens.refreshToken,
u'expires_in': expires_at - time.time()
}
self.auth = NeptuneAuth(
OAuth2Session(
client_id=client_name,
token=token,
:param ssl_cert: Provide a client-side certificate to use. Either a
sequence of strings pointing to the certificate (1) and the private
key (2), or a string pointing to the combined certificate and key.
"""
self.session = authenticator.session
self.authenticator = authenticator
self.ssl_verify = ssl_verify
self.ssl_cert = ssl_cert
def authenticated_request(self, request_params):
# type: (typing.Mapping[str, typing.Any]) -> requests.Request
return self.apply_authentication(
self.session.request(execute=False, **request_params))
class OauthAuthenticator(Authenticator):
def __init__(
self,
server,
realm,
user=None,
pw=None,
keycloak_url=None,
offline_token=None,
endpoint_name='kernel'
):
self.realm = realm
self.endpoint_name = endpoint_name
self.host = urlparse(server).netloc
self.session = get_session(
server, realm, user, pw, keycloak_url, offline_token)