Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import logging
import pandas as pd
import requests
from cryptofeed.rest.api import API, request_retry
from cryptofeed.defines import DERIBIT, SELL, BUY, BID, ASK
from cryptofeed.standards import pair_std_to_exchange, timestamp_normalize
from sortedcontainers import SortedDict as sd
REQUEST_LIMIT = 1000
RATE_LIMIT_SLEEP = 0.2
LOG = logging.getLogger('rest')
class Deribit(API):
ID = DERIBIT
api = "https://www.deribit.com/api/v2/public/"
def trades(self, symbol: str, start=None, end=None, retry=None, retry_wait=10):
symbol = pair_std_to_exchange(symbol, self.ID)
for data in self._get_trades(symbol, start, end, retry, retry_wait):
yield data
def _get_trades(self, instrument, start_date, end_date, retry, retry_wait):
start = None
end = None
if start_date:
if not end_date:
end_date = pd.Timestamp.utcnow()
start = API._timestamp(start_date)
def _historical_trades(self, symbol, start_date, end_date, retry, retry_wait, freq='6H'):
symbol = pair_std_to_exchange(symbol, self.ID + 'REST')
@request_retry(self.ID, retry, retry_wait)
def helper(start_date):
endpoint = f"{self.api}/public/Trades?pair={symbol}&since={start_date}"
return requests.get(endpoint)
start_date = API._timestamp(start_date).timestamp() * 1000000000
end_date = API._timestamp(end_date).timestamp() * 1000000000
while start_date < end_date:
r = helper(start_date)
if r.status_code == 504 or r.status_code == 520:
# cloudflare gateway timeout or other error
time.sleep(60)
continue
elif r.status_code != 200:
self._handle_error(r, LOG)
else:
time.sleep(RATE_LIMIT_SLEEP)
data = r.json()
if 'error' in data and data['error']:
def _get(self, ep, symbol, start_date, end_date, retry, retry_wait, freq='6H'):
dates = [None]
if start_date:
if not end_date:
end_date = pd.Timestamp.utcnow()
dates = pd.interval_range(API._timestamp(start_date), API._timestamp(end_date), freq=freq).tolist()
if len(dates) == 0:
dates.append(pd.Interval(left=API._timestamp(start_date), right=API._timestamp(end_date)))
elif dates[-1].right < API._timestamp(end_date):
dates.append(pd.Interval(dates[-1].right, API._timestamp(end_date)))
@request_retry(self.ID, retry, retry_wait)
def helper(start, start_date, end_date):
if start_date and end_date:
endpoint = f'/api/v1/{ep}?symbol={symbol}&count={API_MAX}&reverse=false&start={start}&startTime={start_date}&endTime={end_date}'
else:
endpoint = f'/api/v1/{ep}?symbol={symbol}&reverse=true'
header = {}
if self.key_id and self.key_secret:
header = self._generate_signature("GET", endpoint)
header['Accept'] = 'application/json'
return requests.get('{}{}'.format(self.api, endpoint), headers=header)
for interval in dates:
start = 0
if interval is not None:
def _get_trades_hist(self, symbol, start_date, end_date, retry, retry_wait):
last = []
start = None
end = None
if start_date:
if not end_date:
end_date = pd.Timestamp.utcnow()
start = API._timestamp(start_date)
end = API._timestamp(end_date) - pd.Timedelta(nanoseconds=1)
start = int(start.timestamp() * 1000)
end = int(end.timestamp() * 1000)
@request_retry(self.ID, retry, retry_wait)
def helper(start, end):
if start and end:
return requests.get(f"{self.api}trades/{symbol}/hist?limit={REQUEST_LIMIT}&start={start}&end={end}&sort=1")
else:
return requests.get(f"{self.api}trades/{symbol}/hist")
while True:
r = helper(start, end)
if r.status_code == 429:
sleep(int(r.headers['Retry-After']))
import pandas as pd
from cryptofeed.rest.api import API, request_retry
from cryptofeed.defines import BITMEX, SELL, BUY, BID, ASK
from cryptofeed.standards import timestamp_normalize
S3_ENDPOINT = 'https://s3-eu-west-1.amazonaws.com/public.bitmex.com/data/{}/{}.csv.gz'
RATE_LIMIT_SLEEP = 2
API_MAX = 500
API_REFRESH = 300
LOG = logging.getLogger('rest')
class Bitmex(API):
ID = BITMEX
api = 'https://www.bitmex.com'
def _generate_signature(self, verb: str, url: str, data='') -> dict:
"""
verb: GET/POST/PUT
url: api endpoint
data: body (if present)
"""
expires = int(round(time.time()) + 30)
parsedURL = urlparse(url)
path = parsedURL.path
if parsedURL.query:
path = path + '?' + parsedURL.query
def _get_trades(self, instrument, start_date, end_date, retry, retry_wait):
start = None
end = None
if start_date:
if not end_date:
end_date = pd.Timestamp.utcnow()
start = API._timestamp(start_date)
end = API._timestamp(end_date) - pd.Timedelta(nanoseconds=1)
start = int(start.timestamp() * 1000)
end = int(end.timestamp() * 1000)
@request_retry(self.ID, retry, retry_wait)
def helper(start, end):
if start and end:
return requests.get(f"{self.api}get_last_trades_by_instrument_and_time?&start_timestamp={start}&end_timestamp={end}&instrument_name={instrument}&include_old=true&count={REQUEST_LIMIT}")
else:
return requests.get(f"{self.api}get_last_trades_by_instrument_and_time/")
while True:
r = helper(start, end)
if r.status_code == 429:
import logging
from decimal import Decimal
import pandas as pd
from sortedcontainers.sorteddict import SortedDict as sd
from cryptofeed.rest.api import API, request_retry
from cryptofeed.defines import KRAKEN, SELL, BUY, BID, ASK, CANCELLED, OPEN, FILLED, MARKET, LIMIT
from cryptofeed.standards import pair_std_to_exchange, normalize_trading_options, pair_exchange_to_std
LOG = logging.getLogger('rest')
RATE_LIMIT_SLEEP = 1
class Kraken(API):
ID = KRAKEN
api = "https://api.kraken.com/0"
@staticmethod
def _fix_currencies(currency: str):
cur_map = {
'XXBT': 'BTC',
'XXDG': 'DOGE',
'XXLM': 'XLM',
'XXMR': 'XMR',
'XXRP': 'XRP',
'ZUSD': 'USD',
'ZCAD': 'CAD',
'ZGBP': 'GBP',
'ZJPY': 'JPY'
from sortedcontainers.sorteddict import SortedDict as sd
import pandas as pd
from cryptofeed.rest.api import API, request_retry
from cryptofeed.defines import GEMINI, BID, ASK, CANCELLED, FILLED, OPEN, PARTIAL, BUY, SELL, LIMIT
from cryptofeed.standards import pair_std_to_exchange, pair_exchange_to_std, normalize_trading_options
LOG = logging.getLogger('rest')
RATE_LIMIT_SLEEP = 0.5
# https://docs.gemini.com/rest-api/#introduction
# For public API entry points, we limit requests to 120 requests per minute, and recommend that you do not exceed 1 request per second.
# For private API entry points, we limit requests to 600 requests per minute, and recommend that you not exceed 5 requests per second.
class Gemini(API):
ID = GEMINI
api = "https://api.gemini.com"
sandbox_api = "https://api.sandbox.gemini.com"
@staticmethod
def _order_status(data):
status = PARTIAL
if data['is_cancelled']:
status = CANCELLED
elif Decimal(data['remaining_amount']) == 0:
status = FILLED
elif Decimal(data['executed_amount']) == 0:
status = OPEN
price = Decimal(data['price']) if Decimal(data['avg_execution_price']) == 0 else Decimal(data['avg_execution_price'])
def trade_history(self, symbol: str, start=None, end=None):
sym = pair_std_to_exchange(symbol, self.ID)
params = {
'symbol': sym,
'limit_trades': 500
}
if start:
params['timestamp'] = API._timestamp(start).timestamp()
data = self._post("/v1/mytrades", params)
return [
{
'price': Decimal(trade['price']),
'amount': Decimal(trade['amount']),
'timestamp': trade['timestampms'] / 1000,
'side': BUY if trade['type'].lower() == 'buy' else SELL,
'fee_currency': trade['fee_currency'],
'fee_amount': trade['fee_amount'],
'trade_id': trade['tid'],
'order_id': trade['order_id']
}
for trade in data
]