Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
start, end = None, None
try:
aggregation_start = time.time()
if time_partition:
interval_start = aggregation_start
if end:
interval_start = end + timedelta(seconds=interval + 1)
start, end = get_time_interval(interval_start, base_interval, multiplier=multiplier)
if 'exchanges' in self.config and self.config.exchanges:
for exchange in self.config.exchanges:
for dtype in self.config.exchanges[exchange]:
# Skip over the retries arg in the config if present.
if dtype in {'retries', 'channel_timeouts'}:
continue
for pair in self.config.exchanges[exchange][dtype] if 'symbols' not in self.config.exchanges[exchange][dtype] else self.config.exchanges[exchange][dtype]['symbols']:
store = Storage(self.config)
LOG.info('Reading %s-%s-%s', exchange, dtype, pair)
data = cache.read(exchange, dtype, pair, start=start, end=end)
if len(data) == 0:
LOG.info('No data for %s-%s-%s', exchange, dtype, pair)
continue
store.aggregate(data)
store.write(exchange, dtype, pair, time.time())
cache.delete(exchange, dtype, pair)
LOG.info('Write Complete %s-%s-%s', exchange, dtype, pair)
total = time.time() - aggregation_start
wait = interval - total
if wait <= 0:
LOG.warning("Storage operations currently take %.1f seconds, longer than the interval of %d", total, interval)
wait = 0.5
def _worker(self, exchange):
r = Rest()
storage = Storage(self.config)
for pair in self.config.backfill[exchange]:
try:
start = self.config.backfill[exchange][pair].start
while True:
end = storage.get_start_date(exchange, 'trades', pair)
if all(e for e in end):
break
time.sleep(10)
ends = list(map(lambda x: Timestamp(x, unit='s') - Timedelta(microseconds=1), end))
if any(e <= Timestamp(start) for e in ends):
LOG.info("Data in storage is earlier than backfill start date for %s - %s", exchange, pair)
continue
def __init__(self, config):
self.config = config
if isinstance(config.storage, list):
self.s = [Storage.__init_helper(s, config) for s in config.storage]
else:
self.s = [Storage.__init_helper(config.storage, config)]