Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def main():
f = FeedHandler()
# Deribit can't handle 400+ simultaneous requests, so if all
# instruments are needed they should be fed in the different calls
config = {TRADES: ["BTC-PERPETUAL"], TICKER: ['ETH-PERPETUAL'], FUNDING: ['ETH-PERPETUAL'], OPEN_INTEREST: ['ETH-PERPETUAL']}
f.add_feed(Deribit(config=config, callbacks={OPEN_INTEREST: oi, FUNDING: funding, TICKER: TickerCallback(ticker), TRADES: TradeCallback(trade)}))
f.add_feed(Deribit(pairs=['BTC-PERPETUAL'], channels=[L2_BOOK], callbacks={L2_BOOK: BookCallback(book)}))
f.add_feed(Deribit(pairs=['BTC-26JUN20', 'BTC-25SEP20-11000-P'], channels=[TICKER], callbacks={TICKER: TickerCallback(ticker)}))
f.run()
def main():
f = FeedHandler()
f.add_feed(Bitmex(channels=[TRADES, FUNDING, OPEN_INTEREST], pairs=['XBTUSD'], callbacks={
TRADES: TradeRedis(), FUNDING: FundingRedis(), OPEN_INTEREST: OpenInterestRedis()}))
f.add_feed(Bitfinex(channels=[TRADES], pairs=['BTC-USD'], callbacks={TRADES: TradeRedis()}))
f.add_feed(Coinbase(channels=[TRADES], pairs=['BTC-USD'], callbacks={TRADES: TradeRedis()}))
f.add_feed(Coinbase(max_depth=10, channels=[L2_BOOK], pairs=['BTC-USD'], callbacks={L2_BOOK: BookRedis()}))
f.add_feed(Gemini(pairs=['BTC-USD'], callbacks={TRADES: TradeRedis()}))
f.run()
next_rate_timestamp=timestamp_normalize(self.id, msg['next_funding_rate_time']))
else:
await self.callback(FUNDING,
feed=self.id,
pair=pair,
timestamp=timestamp_normalize(self.id, msg['time']),
receipt_timestamp=timestamp,
tag=msg['tag'],
premium=msg['premium'],
maturity_timestamp=timestamp_normalize(self.id, msg['maturityTime']))
oi = msg['openInterest']
if pair in self.open_interest and oi == self.open_interest[pair]:
return
self.open_interest[pair] = oi
await self.callback(OPEN_INTEREST,
feed=self.id,
pair=pair,
open_interest=msg['openInterest'],
timestamp=timestamp_normalize(self.id, msg['time']),
receipt_timestamp=timestamp
)
async def subscribe(self, websocket):
self.websocket = websocket
self.__reset()
for chan in self.channels if self.channels else self.config:
if chan == FUNDING:
asyncio.create_task(self._funding(self.pairs if self.pairs else self.config[chan]))
continue
if chan == OPEN_INTEREST:
asyncio.create_task(self._open_interest(self.pairs if self.pairs else self.config[chan]))
continue
for pair in self.pairs if self.pairs else self.config[chan]:
await websocket.send(json.dumps(
{
"channel": chan,
"market": pair,
"op": "subscribe"
}
agg.append(f'{data_type}-{exchange},pair={pair},exchange={exchange},delta={entry["delta"]} side="{entry["side"]}",timestamp={entry["timestamp"]},receipt_timestamp={entry["receipt_timestamp"]},price={entry["price"]},amount={entry["size"]} {ts}')
elif data_type == L3_BOOK:
for entry in self.data:
ts = int(Decimal(entry["timestamp"]) * 1000000000)
while ts in used_ts:
ts += 1
used_ts.add(ts)
agg.append(f'{data_type}-{exchange},pair={pair},exchange={exchange},delta={entry["delta"]} side="{entry["side"]}",id="{entry["order_id"]}",timestamp={entry["timestamp"]},receipt_timestamp={entry["receipt_timestamp"]},price="{entry["price"]}",amount="{entry["size"]}" {ts}')
ts += 1
elif data_type == FUNDING:
for entry in self.data:
formatted = [f"{key}={value}" for key, value in entry.items() if isinstance(value, float)]
formatted = ','.join(formatted + [f'{key}="{value}"' for key, value in entry.items() if not isinstance(value, float)])
agg.append(f'{data_type}-{exchange},pair={pair},exchange={exchange} {formatted}')
elif data_type == OPEN_INTEREST:
for entry in self.data:
ts = int(Decimal(entry["timestamp"]) * 1000000000)
agg.append(f'{data_type}-{exchange},pair={pair},exchange={exchange} open_interest={entry["open_interest"]},timestamp={entry["timestamp"]},receipt_timestamp={entry["receipt_timestamp"]} {ts}')
# https://v2.docs.influxdata.com/v2.0/write-data/best-practices/optimize-writes/
# Tuning docs indicate 5k is the ideal chunk size for batch writes
for c in chunk(agg, 5000):
c = '\n'.join(c)
r = requests.post(self.addr, data=c)
r.raise_for_status()
self.data = None
df = df.drop(['timestamp'], axis=1)
if data_type == TRADES:
if 'id' in df:
df['id'] = df['id'].astype(str)
df['size'] = df.amount
df = df.drop(['pair', 'feed', 'amount'], axis=1)
chunk_size = 'H'
elif data_type == TICKER:
df = df.drop(['pair', 'feed'], axis=1)
chunk_size = 'D'
elif data_type in { L2_BOOK, L3_BOOK }:
chunk_size = 'T'
elif data_type == FUNDING:
chunk_size = 'D'
elif data_type == OPEN_INTEREST:
df = df.drop(['pair', 'feed'], axis=1)
chunk_size = 'D'
df.set_index('date', inplace=True)
# All timestamps are in UTC
df.index = df.index.tz_localize(None)
if exchange not in self.con.list_libraries():
self.con.initialize_library(exchange, lib_type=StorageEngines.arctic.CHUNK_STORE)
self.con[exchange].append(f"{data_type}-{pair}", df, upsert=True, chunk_size=chunk_size)
for update in msg['data']:
pair = update['instrument_id']
update_timestamp = timestamp_normalize(self.id, update['timestamp'])
await self.callback(callback_type, feed=self.id,
pair=pair,
bid=Decimal(update['best_bid']),
ask=Decimal(update['best_ask']),
timestamp=update_timestamp,
receipt_timestamp=timestamp)
if 'open_interest' in update:
oi = update['open_interest']
if pair in self.open_interest and oi == self.open_interest[pair]:
continue
self.open_interest[pair] = oi
await self.callback(OPEN_INTEREST, feed=self.id, pair=pair, open_interest=oi, timestamp=update_timestamp, receipt_timestamp=timestamp)
self.config[chan].update([pair_std_to_exchange(pair, self.id) for pair in config[channel]])
if pairs:
self.pairs = [pair_std_to_exchange(pair, self.id) for pair in pairs]
if channels:
self.channels = list(set([feed_to_exchange(self.id, chan) for chan in channels]))
self.l3_book = {}
self.l2_book = {}
self.callbacks = {TRADES: Callback(None),
TICKER: Callback(None),
L2_BOOK: Callback(None),
L3_BOOK: Callback(None),
VOLUME: Callback(None),
FUNDING: Callback(None),
OPEN_INTEREST: Callback(None),
LIQUIDATIONS: Callback(None)}
if callbacks:
for cb_type, cb_func in callbacks.items():
self.callbacks[cb_type] = cb_func
if cb_type == BOOK_DELTA:
self.do_deltas = True
for key, callback in self.callbacks.items():
if not isinstance(callback, list):
self.callbacks[key] = [callback]
rate_limiter = 1 # don't fetch too many pairs too fast
async with aiohttp.ClientSession() as session:
while True:
for pair in pairs:
# OI only for perp and futures, so check for / in pair name indicating spot
if '/' in pair:
continue
end_point = f"https://ftx.com/api/futures/{pair}/stats"
async with session.get(end_point) as response:
data = await response.text()
data = json.loads(data, parse_float=Decimal)
if 'result' in data:
oi = data['result']['openInterest']
if oi != self.open_interest.get(pair, None):
await self.callback(OPEN_INTEREST,
feed=self.id,
pair=pair,
open_interest=oi,
timestamp=None,
receipt_timestamp=time()
)
self.open_interest[pair] = oi
await asyncio.sleep(rate_limiter)
wait_time = 60
await asyncio.sleep(wait_time)
ask=Decimal(msg["params"]["data"]['best_ask_price']),
timestamp=ts,
receipt_timestamp=timestamp)
if "current_funding" in msg["params"]["data"] and "funding_8h" in msg["params"]["data"]:
await self.callback(FUNDING, feed=self.id,
pair=pair,
timestamp=ts,
receipt_timestamp=timestamp,
rate=msg["params"]["data"]["current_funding"],
rate_8h=msg["params"]["data"]["funding_8h"])
oi = msg['params']['data']['open_interest']
if pair in self.open_interest and oi == self.open_interest[pair]:
return
self.open_interest[pair] = oi
await self.callback(OPEN_INTEREST,
feed=self.id,
pair=pair,
open_interest=oi,
timestamp=ts,
receipt_timestamp=timestamp
)