Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def _l2_book(self, msg: dict, timestamp: float):
data = msg['data']
chan = msg['channel']
ts = int(data['microtimestamp'])
pair = pair_exchange_to_std(chan.split('_')[-1])
forced = False
delta = {BID: [], ASK: []}
if pair in self.last_update_id:
if data['timestamp'] < self.last_update_id[pair]:
return
else:
forced = True
del self.last_update_id[pair]
for side in (BID, ASK):
for update in data[side + 's']:
price = Decimal(update[0])
size = Decimal(update[1])
if size == 0:
if price in self.l2_book[pair][side]:
del self.l2_book[pair][side][price]
delta[side].append((price, size))
else:
self.l2_book[pair][side][price] = size
delta[side].append((price, size))
await self.book_callback(self.l2_book[pair], L2_BOOK, pair, forced, delta, timestamp_normalize(self.id, ts), timestamp)
def book_flatten(book: dict, timestamp: float, receipt_timestamp: float, delta: str) -> dict:
"""
takes book and returns a list of dict, where each element in the list
is a dictionary with a single row of book data.
eg.
L2:
[{'side': str, 'price': float, 'size': float, 'timestamp': float}, {...}, ...]
L3:
[{'side': str, 'price': float, 'size': float, 'timestamp': float, 'order_id': str}, {...}, ...]
"""
ret = []
for side in (BID, ASK):
for price, data in book[side].items():
if isinstance(data, dict):
# L3 book
for order_id, size in data.items():
ret.append({'side': side, 'price': price, 'size': size, 'order_id': order_id, 'timestamp': timestamp, 'receipt_timestamp': receipt_timestamp, 'delta': delta})
else:
ret.append({'side': side, 'price': price, 'size': data, 'timestamp': timestamp, 'receipt_timestamp': receipt_timestamp, 'delta': delta})
return ret
async def _book(self, msg: dict, timestamp: float):
if msg['action'] == 'partial':
# snapshot
for update in msg['data']:
pair = pair_exchange_to_std(update['instrument_id'])
self.l2_book[pair] = {
BID: sd({
Decimal(price): Decimal(amount) for price, amount, *_ in update['bids']
}),
ASK: sd({
Decimal(price): Decimal(amount) for price, amount, *_ in update['asks']
})
}
await self.book_callback(self.l2_book[pair], L2_BOOK, pair, True, None, timestamp_normalize(self.id, update['timestamp']), timestamp)
else:
# update
for update in msg['data']:
delta = {BID: [], ASK: []}
pair = pair_exchange_to_std(update['instrument_id'])
for side in ('bids', 'asks'):
s = BID if side == 'bids' else ASK
for price, amount, *_ in update[side]:
price = Decimal(price)
amount = Decimal(amount)
if amount == 0:
if price in self.l2_book[pair][s]:
"""
forced = False
delta = {BID: [], ASK: []}
if msg[0] == 'AE':
# snapshot
forced = True
pair = pair_exchange_to_std(msg[2])
ts = msg[3]
asks = msg[4]['asks'] if 'asks' in msg[4] else msg[5]['asks']
bids = msg[5]['bids'] if 'bids' in msg[5] else msg[4]['bids']
self.l2_book[pair] = {
BID: sd({
Decimal(price): Decimal(amount)
for price, amount in bids
}),
ASK: sd({
Decimal(price): Decimal(amount)
for price, amount in asks
})
}
else:
# Update
ts = msg[2]
pair = pair_exchange_to_std(msg[3])
side = ASK if msg[4] == 'ASK' else BID
price = Decimal(msg[5])
amount = Decimal(msg[6])
if amount == 0:
if price in self.l2_book[pair][side]:
del self.l2_book[pair][side][price]
delta[side].append((price, 0))
delta = {BID: [], ASK: []}
forced = False
if isinstance(msg[1], list):
if isinstance(msg[1][0], list):
# snapshot so clear book
self.l2_book[pair] = {BID: sd(), ASK: sd()}
for update in msg[1]:
price, _, amount = update
price = Decimal(price)
amount = Decimal(amount)
if amount > 0:
side = BID
else:
side = ASK
amount = abs(amount)
self.l2_book[pair][side][price] = amount
forced = True
else:
# book update
price, count, amount = msg[1]
price = Decimal(price)
amount = Decimal(amount)
if amount > 0:
side = BID
else:
side = ASK
amount = abs(amount)
if count > 0:
def _book(self, symbol: str, l3=False, retry=0, retry_wait=0):
ret = {}
sym = symbol
funding = False
if '-' not in symbol:
ret[symbol] = {BID: sd(), ASK: sd()}
symbol = f"f{symbol}"
funding = True
else:
symbol = pair_std_to_exchange(symbol, self.ID)
ret[symbol] = {BID: sd(), ASK: sd()}
sym = symbol
@request_retry(self.ID, retry, retry_wait)
def helper():
precision = 'R0' if l3 is True else 'P0'
return requests.get(f"{self.api}/book/{symbol}/{precision}?len=100")
while True:
r = helper()
if r.status_code == 429:
},
...
],
"asks": [
{
"price": 11739.0,
"qty": 47410.0
},
...
],
"tickSize": null
}
"""
self.l2_book[pair] = {
BID: sd({Decimal(update['price']): Decimal(update['qty']) for update in msg['bids']}),
ASK: sd({Decimal(update['price']): Decimal(update['qty']) for update in msg['asks']})
}
await self.book_callback(self.l2_book[pair], L2_BOOK, pair, True, None, timestamp, timestamp)
def _reset(self):
self.partial_received = defaultdict(bool)
self.order_id = {}
for pair in self.pairs:
self.l2_book[pair] = {BID: sd(), ASK: sd()}
self.order_id[pair] = defaultdict(dict)
async def book(feed, pair, book, timestamp):
print(f'Timestamp: {timestamp} Feed: {feed} Pair: {pair} Book Bid Size is {len(book[BID])} Ask Size is {len(book[ASK])}')
def book_delta(former: dict, latter: dict, book_type=L2_BOOK) -> list:
ret = {BID: [], ASK: []}
if book_type == L2_BOOK:
for side in (BID, ASK):
fkeys = set(list(former[side].keys()))
lkeys = set(list(latter[side].keys()))
for price in fkeys - lkeys:
ret[side].append((price, 0))
for price in lkeys - fkeys:
ret[side].append((price, latter[side][price]))
for price in lkeys.intersection(fkeys):
if former[side][price] != latter[side][price]:
ret[side].append((price, latter[side][price]))
else:
raise ValueError("Not supported for L3 Books")
return ret