Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def trades(self, symbol: str, start=None, end=None, retry=None, retry_wait=10):
symbol = pair_std_to_exchange(symbol, self.ID)
for data in self._get_trades_hist(symbol, start, end, retry, retry_wait):
yield data
def _historical_trades(self, symbol, start_date, end_date, retry, retry_wait, freq='6H'):
symbol = pair_std_to_exchange(symbol, self.ID + 'REST')
@request_retry(self.ID, retry, retry_wait)
def helper(start_date):
endpoint = f"{self.api}/public/Trades?pair={symbol}&since={start_date}"
return requests.get(endpoint)
start_date = API._timestamp(start_date).timestamp() * 1000000000
end_date = API._timestamp(end_date).timestamp() * 1000000000
while start_date < end_date:
r = helper(start_date)
if r.status_code == 504 or r.status_code == 520:
# cloudflare gateway timeout or other error
time.sleep(60)
continue
def trades(self, symbol: str, start=None, end=None, retry=None, retry_wait=10):
symbol = pair_std_to_exchange(symbol, self.ID)
for data in self._get_trades(symbol, start, end, retry, retry_wait):
yield data
def _book(self, symbol: str, retry=0, retry_wait=0):
ret = {}
symbol = pair_std_to_exchange(symbol, self.ID)
ret[symbol] = {BID: sd(), ASK: sd()}
@request_retry(self.ID, retry, retry_wait)
def helper():
return requests.get(f"{self.api}get_order_book?depth=10000&instrument_name={symbol}")
while True:
r = helper()
if r.status_code == 429:
sleep(int(r.headers['Retry-After']))
continue
elif r.status_code == 500:
LOG.warning("%s: 500 for URL %s - %s", self.ID, r.url, r.text)
sleep(retry_wait)
if retry == 0:
def feed_to_exchange(exchange, feed):
if exchange == POLONIEX:
if feed not in _feed_to_exchange_map:
return pair_std_to_exchange(feed, POLONIEX)
ret = _feed_to_exchange_map[feed][exchange]
if ret == UNSUPPORTED:
LOG.error("{} is not supported on {}".format(feed, exchange))
raise ValueError("{} is not supported on {}".format(feed, exchange))
return ret
def __init__(self, pairs=None, channels=None, callbacks=None, **kwargs):
self.channels = None
if pairs and len(pairs) == 1:
self.pair = pairs[0]
super().__init__('wss://bitmax.io/api/public/', pairs=None, channels=None, callbacks=callbacks, **kwargs)
self.address += pair_std_to_exchange(self.pair, self.id).replace('/', '-')
self.pairs = pairs
else:
self.pairs = pairs
self.config = kwargs.get('config', None)
self.callbacks = callbacks
def trades(self, symbol: str, start=None, end=None, retry=None, retry_wait=10):
sym = pair_std_to_exchange(symbol, self.ID)
params = {'limit_trades': 500}
if start:
params['since'] = int(pd.Timestamp(start).timestamp() * 1000)
if end:
end_ts = int(pd.Timestamp(end).timestamp() * 1000)
def _trade_normalize(trade):
return {
'feed': self.ID,
'order_id': trade['tid'],
'pair': sym,
'side': trade['type'],
'amount': Decimal(trade['amount']),
'price': Decimal(trade['price']),
'timestamp': trade['timestampms'] / 1000.0
}
def l2_book(self, symbol: str, retry=None, retry_wait=0):
sym = pair_std_to_exchange(symbol, self.ID)
data = self._get(f"/v1/book/{sym}", retry, retry_wait)
return {
BID: sd({
Decimal(u['price']): Decimal(u['amount'])
for u in data['bids']
}),
ASK: sd({
Decimal(u['price']): Decimal(u['amount'])
for u in data['asks']
})
def trades(self, symbol: str, start=None, end=None, retry=None, retry_wait=10):
symbol = pair_std_to_exchange(symbol, self.ID)
for data in self._get_trades_hist(symbol, start, end, retry, retry_wait):
yield data