Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def collect_data_file(exchange, symbol):
success = False
data_collector = DataCollector(copy(get_bot().get_config()), False)
try:
result = get_bot().run_in_main_asyncio_loop(data_collector.execute_with_specific_target(exchange, symbol))
success = True
except Exception as e:
data_collector.stop()
result = f"data collector error: {e}"
if success:
return success, f"{result} saved"
else:
return success, f"Can't collect data for {symbol} on {exchange} ({result})"
import csv
import time
import ccxt
from backtesting.collector.data_collector import DataCollectorParser, ExchangeDataCollector, DataCollector
from config.cst import *
# from zipline.data.bundles import register
# from zipline.data.bundles.csvdir import csvdir_equities
from trading import Exchange
class ZiplineDataCollector(DataCollector):
def __init__(self, config):
super().__init__(config)
def create_exchange_data_collectors(self):
available_exchanges = ccxt.exchanges
for exchange_class_string in self.config[CONFIG_EXCHANGES]:
if exchange_class_string in available_exchanges:
exchange_type = getattr(ccxt, exchange_class_string)
exchange_inst = Exchange(self.config, exchange_type)
exchange_data_collector = ZiplineExchangeDataCollector(self.config, exchange_inst)
exchange_data_collector.start()
self.exchange_data_collectors_threads.append(exchange_data_collector)
else:
def data_collector(config, catch=True):
data_collector_inst = None
try:
data_collector_inst = DataCollector(config)
asyncio.run(data_collector_inst.start())
except Exception as e:
data_collector_inst.stop()
if catch:
raise e