Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if start and end:
data = [[key, self.conn.xrange(key, min=start, max=end)]]
else:
data = self.conn.xread({key: '0-0' if key not in self.last_id else self.last_id[key]})
if len(data) == 0 or len(data[0][1]) == 0:
return []
LOG.info("%s: Read %d messages from Redis", key, len(data[0][1]))
ret = []
for update_id, update in data[0][1]:
if dtype in {L2_BOOK, L3_BOOK}:
update = json.loads(update['data'])
update = book_flatten(update, update['timestamp'], update['receipt_timestamp'], update['delta'])
for u in update:
for k in ('size', 'amount', 'price', 'timestamp', 'receipt_timestamp'):
if k in u:
u[k] = float(u[k])
ret.extend(update)
elif dtype in {TRADES, TICKER, OPEN_INTEREST}:
for k in ('size', 'amount', 'price', 'timestamp', 'receipt_timestamp', 'bid', 'ask', 'open_interest'):
if k in update:
update[k] = float(update[k])
ret.append(update)
elif dtype == FUNDING:
for k in update:
try:
update[k] = float(update[k])
except ValueError:
# ignore strings
def read(self, exchange, dtype, pair):
key = f'{dtype}-{exchange}-{pair}'
data = self._conn(key).consume(1000000, timeout=0.5)
LOG.info("%s: Read %d messages from Kafka", key, len(data))
ret = []
for message in data:
self.ids[key] = message
update = json.loads(message.value().decode('utf8'))
if dtype in {L2_BOOK, L3_BOOK}:
update = book_flatten(update, update['timestamp'], update['delta'])
ret.extend(update)
if dtype in {TRADES, TICKER, FUNDING, OPEN_INTEREST}:
ret.append(update)
return ret