Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
associated with this software.
'''
import json
import logging
from cryptofeed.defines import L2_BOOK, L3_BOOK, TRADES, TICKER, FUNDING, OPEN_INTEREST
from cryptostore.engines import StorageEngines
from cryptostore.aggregator.cache import Cache
from cryptostore.aggregator.util import book_flatten
LOG = logging.getLogger('cryptostore')
class Kafka(Cache):
def __init__(self, ip, port, flush=False):
self.conn = {}
self.ip = ip
self.port = port
self.ids = {}
if flush:
kafka = StorageEngines['confluent_kafka.admin']
ac = kafka.admin.AdminClient({'bootstrap.servers': f"{ip}:{port}"})
topics = list(ac.list_topics().topics.keys())
for topic, status in ac.delete_topics(topics).items():
try:
status.result()
LOG.info("Topic %s deleted", topic)
except Exception as e:
LOG.warning("Failed to delete topic %s: %s", topic, e)
import logging
from collections import defaultdict
import json
import time
from cryptofeed.defines import TRADES, L2_BOOK, L3_BOOK, TICKER, FUNDING, OPEN_INTEREST
from cryptostore.aggregator.util import book_flatten
from cryptostore.aggregator.cache import Cache
from cryptostore.engines import StorageEngines
LOG = logging.getLogger('cryptostore')
class Redis(Cache):
def __init__(self, ip=None, port=None, socket=None, del_after_read=True, flush=False, retention=None):
self.del_after_read = del_after_read
self.retention = retention
self.last_id = {}
self.ids = defaultdict(list)
if ip and port and socket:
raise ValueError("Cannot specify ip/port and socket for Redis")
self.conn = StorageEngines.redis.Redis(ip, port, unix_socket_path=socket, decode_responses=True)
if flush:
LOG.info('Flushing cache')
self.conn.flushall()
def read(self, exchange, dtype, pair, start=None, end=None):
key = f'{dtype}-{exchange}-{pair}'