Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def _funding(self, msg: dict, pair: str, timestamp: float):
if msg['tag'] == 'perpetual':
await self.callback(FUNDING,
feed=self.id,
pair=pair,
timestamp=timestamp_normalize(self.id, msg['time']),
receipt_timestamp=timestamp,
tag=msg['tag'],
rate=msg['funding_rate'],
rate_prediction=msg.get('funding_rate_prediction', None),
relative_rate=msg['relative_funding_rate'],
relative_rate_prediction=msg.get('relative_funding_rate_prediction', None),
next_rate_timestamp=timestamp_normalize(self.id, msg['next_funding_rate_time']))
else:
await self.callback(FUNDING,
feed=self.id,
pair=pair,
timestamp=timestamp_normalize(self.id, msg['time']),
receipt_timestamp=timestamp,
def main():
f = FeedHandler()
f.add_feed(Bitmex(channels=[TRADES, FUNDING], pairs=['XBTUSD'], callbacks={TRADES: TradeArctic('cryptofeed-test'), FUNDING: FundingArctic('cryptofeed-test')}))
f.add_feed(Bitfinex(channels=[TRADES], pairs=['BTC-USD'], callbacks={TRADES: TradeArctic('cryptofeed-test')}))
f.add_feed(Coinbase(channels=[TICKER], pairs=['BTC-USD'], callbacks={TICKER: TickerArctic('cryptofeed-test')}))
f.run()
if book_up:
cb[BOOK_DELTA] = [book_up(key=L3_BOOK, **kwargs)]
elif callback_type == OPEN_INTEREST:
cb[OPEN_INTEREST] = [oi_cb(**kwargs)]
if 'pass_through' in self.config:
if self.config['pass_through']['type'] == 'zmq':
from cryptofeed.backends.zmq import TradeZMQ, BookDeltaZMQ, BookZMQ, FundingZMQ, OpenInterestZMQ, TickerZMQ
import zmq
host = self.config['pass_through']['host']
port = self.config['pass_through']['port']
if callback_type == TRADES:
cb[TRADES].append(TradeZMQ(host=host, port=port))
elif callback_type == FUNDING:
cb[FUNDING].append(FundingZMQ(host=host, port=port))
elif callback_type == L2_BOOK:
cb[L2_BOOK].append(BookZMQ(host=host, port=port))
elif callback_type == L3_BOOK:
cb[L3_BOOK].append(BookZMQ(host=host, port=port))
elif callback_type == OPEN_INTEREST:
cb[OPEN_INTEREST].append(OpenInterestZMQ(host=host, port=port))
elif callback_type == TICKER:
cb[TICKER].append(TickerZMQ(host=host, port=port))
if BOOK_DELTA in cb:
cb[BOOK_DELTA].append(BookDeltaZMQ(host=host, port=port))
fh.add_feed(self.exchange, timeout=timeout, max_depth=depth, book_interval=window, config={callback_type: self.exchange_config[callback_type]}, callbacks=cb)
fh.run()
df['receipt_timestamp'] = pd.to_datetime(df['receipt_timestamp'], unit='s')
df = df.drop(['timestamp'], axis=1)
if data_type == TRADES:
if 'id' in df:
df['id'] = df['id'].astype(str)
df['size'] = df.amount
df = df.drop(['pair', 'feed', 'amount'], axis=1)
chunk_size = 'H'
elif data_type == TICKER:
df = df.drop(['pair', 'feed'], axis=1)
chunk_size = 'D'
elif data_type in { L2_BOOK, L3_BOOK }:
chunk_size = 'T'
elif data_type == FUNDING:
chunk_size = 'D'
elif data_type == OPEN_INTEREST:
df = df.drop(['pair', 'feed'], axis=1)
chunk_size = 'D'
df.set_index('date', inplace=True)
# All timestamps are in UTC
df.index = df.index.tz_localize(None)
if exchange not in self.con.list_libraries():
self.con.initialize_library(exchange, lib_type=StorageEngines.arctic.CHUNK_STORE)
self.con[exchange].append(f"{data_type}-{pair}", df, upsert=True, chunk_size=chunk_size)
BINANCE: 'trade'
},
TICKER: {
POLONIEX: 1002,
HITBTC: 'subscribeTicker',
BITFINEX: 'ticker',
BITSTAMP: UNSUPPORTED,
COINBASE: 'ticker',
BITMEX: UNSUPPORTED,
KRAKEN: TICKER,
BINANCE: 'ticker'
},
VOLUME: {
POLONIEX: 1003
},
FUNDING: {
BITMEX: 'funding',
BITFINEX: 'trades'
}
}
def feed_to_exchange(exchange, feed):
if exchange == POLONIEX:
if feed not in _feed_to_exchange_map:
return pair_std_to_exchange(feed, POLONIEX)
ret = _feed_to_exchange_map[feed][exchange]
if ret == UNSUPPORTED:
LOG.error("{} is not supported on {}".format(feed, exchange))
raise ValueError("{} is not supported on {}".format(feed, exchange))
return ret
async def subscribe(self, websocket):
self.websocket = websocket
self.__reset()
for chan in self.channels if self.channels else self.config:
if chan == FUNDING:
asyncio.create_task(self._funding(self.pairs if self.pairs else self.config[chan]))
continue
if chan == OPEN_INTEREST:
asyncio.create_task(self._open_interest(self.pairs if self.pairs else self.config[chan]))
continue
for pair in self.pairs if self.pairs else self.config[chan]:
await websocket.send(json.dumps(
{
"channel": chan,
"market": pair,
"op": "subscribe"
}
chan = feed_to_exchange(self.id, channel)
self.config[chan].update([pair_std_to_exchange(pair, self.id) for pair in config[channel]])
if pairs:
self.pairs = [pair_std_to_exchange(pair, self.id) for pair in pairs]
if channels:
self.channels = list(set([feed_to_exchange(self.id, chan) for chan in channels]))
self.l3_book = {}
self.l2_book = {}
self.callbacks = {TRADES: Callback(None),
TICKER: Callback(None),
L2_BOOK: Callback(None),
L3_BOOK: Callback(None),
VOLUME: Callback(None),
FUNDING: Callback(None),
OPEN_INTEREST: Callback(None),
LIQUIDATIONS: Callback(None)}
if callbacks:
for cb_type, cb_func in callbacks.items():
self.callbacks[cb_type] = cb_func
if cb_type == BOOK_DELTA:
self.do_deltas = True
for key, callback in self.callbacks.items():
if not isinstance(callback, list):
self.callbacks[key] = [callback]
def main():
f = FeedHandler()
f.add_feed(FTX(pairs=['BTC-PERP', 'THETA-PERP'], channels=[FUNDING], callbacks={FUNDING: FundingCallback(funding)}))
f.run()