Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if data_type == TRADES:
for entry in self.data:
ts = int(Decimal(entry["timestamp"]) * 1000000000)
while ts in used_ts:
ts += 1
used_ts.add(ts)
if 'id' in entry:
agg.append(f'{data_type}-{exchange},pair={pair},exchange={exchange} side="{entry["side"]}",id="{entry["id"]}",amount={entry["amount"]},price={entry["price"]},timestamp={entry["timestamp"]},receipt_timestamp={entry["receipt_timestamp"]} {ts}')
else:
agg.append(f'{data_type}-{exchange},pair={pair},exchange={exchange} side="{entry["side"]}",amount={entry["amount"]},price={entry["price"]},timestamp={entry["timestamp"]},receipt_timestamp={entry["receipt_timestamp"]} {ts}')
elif data_type == TICKER:
for entry in self.data:
ts = int(Decimal(entry["timestamp"]) * 1000000000)
agg.append(f'{data_type}-{exchange},pair={pair},exchange={exchange} bid={entry["bid"]},ask={entry["ask"]},timestamp={entry["timestamp"]},receipt_timestamp={entry["receipt_timestamp"]} {ts}')
elif data_type == L2_BOOK:
for entry in self.data:
ts = int(Decimal(entry["timestamp"]) * 1000000000)
while ts in used_ts:
ts += 1
used_ts.add(ts)
agg.append(f'{data_type}-{exchange},pair={pair},exchange={exchange},delta={entry["delta"]} side="{entry["side"]}",timestamp={entry["timestamp"]},receipt_timestamp={entry["receipt_timestamp"]},price={entry["price"]},amount={entry["size"]} {ts}')
elif data_type == L3_BOOK:
for entry in self.data:
ts = int(Decimal(entry["timestamp"]) * 1000000000)
while ts in used_ts:
ts += 1
used_ts.add(ts)
agg.append(f'{data_type}-{exchange},pair={pair},exchange={exchange},delta={entry["delta"]} side="{entry["side"]}",id="{entry["order_id"]}",timestamp={entry["timestamp"]},receipt_timestamp={entry["receipt_timestamp"]},price="{entry["price"]}",amount="{entry["size"]}" {ts}')
ts += 1
from cryptofeed.defines import L2_BOOK, L3_BOOK, TRADES, TICKER, VOLUME, FUNDING, UNSUPPORTED
from cryptofeed.standards import pair_std_to_exchange
LOG = logging.getLogger('feedhandler')
_feed_to_exchange_map = {
L2_BOOK: {
BITFINEX: 'book-P0-F0-100',
POLONIEX: UNSUPPORTED,
HITBTC: 'subscribeOrderbook',
COINBASE: 'level2',
BITMEX: 'orderBook10',
BITSTAMP: 'order_book',
KRAKEN: L2_BOOK,
BINANCE: 'depth20'
},
L3_BOOK: {
BITFINEX: 'book-R0-F0-100',
BITSTAMP: UNSUPPORTED,
HITBTC: UNSUPPORTED,
COINBASE: 'full',
BITMEX: 'orderBookL2',
POLONIEX: UNSUPPORTED, # supported by specifying a trading pair as the channel,
KRAKEN: UNSUPPORTED,
BINANCE: UNSUPPORTED
},
TRADES: {
POLONIEX: UNSUPPORTED,
HITBTC: 'subscribeTrades',
BITSTAMP: 'live_trades',
async def _snapshot(self, msg: dict, timestamp: float):
pair = pair_exchange_to_std(msg['symbol'])
self.l2_book[pair] = {ASK: sd(), BID: sd()}
for side in (BID, ASK):
for entry in msg[side]:
price = Decimal(entry['price'])
size = Decimal(entry['size'])
self.l2_book[pair][side][price] = size
await self.book_callback(self.l2_book[pair], L2_BOOK, pair, True, None, timestamp, timestamp)
price = Decimal(price)
amount = Decimal(amount)
side = BUY if update[2] == 1 else SELL
if self.__do_callback(TRADES, pair):
await self.callback(TRADES, feed=self.id,
pair=pair,
side=side,
amount=amount,
price=price,
timestamp=float(server_ts),
order_id=order_id,
receipt_timestamp=timestamp)
else:
LOG.warning("%s: Unexpected message received: %s", self.id, msg)
if self.__do_callback(L2_BOOK, pair):
await self.book_callback(self.l2_book[pair], L2_BOOK, pair, forced, delta, timestamp, timestamp)
def main():
"""
Because periods cannot be in keys in documents in mongo, the bids and asks dictionaries
are converted to BSON. They will need to be decoded after being read
"""
f = FeedHandler()
f.add_feed(Coinbase(max_depth=10, channels=[L2_BOOK],
pairs=['BTC-USD'],
callbacks={TRADES: TradeMongo('coinbase', collection='trades'),
L2_BOOK: BookMongo('coinbase', collection='l2_book'),
BOOK_DELTA: BookDeltaMongo('coinbase', collection='l2_book')}))
f.run()
update = {
BID: sd({
Decimal(unit['bp']): Decimal(unit['bs'])
for unit in msg['obu'] if unit['bp'] > 0
}),
ASK: sd({
Decimal(unit['ap']): Decimal(unit['as'])
for unit in msg['obu'] if unit['ap'] > 0
})
}
if not forced:
self.previous_book[pair] = self.l2_book[pair]
self.l2_book[pair] = update
await self.book_callback(self.l2_book[pair], L2_BOOK, pair, forced, False, orderbook_timestamp, timestamp)
async def _snapshot(self, msg: dict, timestamp: float):
pair = pair_exchange_to_std(msg['symbol'])
self.l2_book[pair] = {ASK: sd(), BID: sd()}
for side in (BID, ASK):
for entry in msg[side]:
price = Decimal(entry['price'])
size = Decimal(entry['size'])
self.l2_book[pair][side][price] = size
await self.book_callback(self.l2_book[pair], L2_BOOK, pair, True, None, timestamp, timestamp)