Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if days:
print("ignoring from data and fetching {}".format(days))
from_date = (datetime.datetime.today() - datetime.timedelta(days=days)).timestamp()
# generate exchange
from technical.exchange import _create_exchange, historical_data
ccxt_api = _create_exchange(ccxt_api)
pair = pair.upper().split("/")
stake = pair[1]
asset = pair[0]
# get newest data from the internal store for this pair
latest_time = OHLCV.session.query(func.max(OHLCV.timestamp)).filter(
OHLCV.exchange == ccxt_api.name,
OHLCV.pair == "{}/{}".format(asset.upper(), stake.upper()),
OHLCV.interval == interval
).one()[0]
if force:
print("forcing database refresh and downloading all data!")
latest_time = None
# add additional data on top
if latest_time is None:
# store data for all
for row in historical_data(stake, asset, interval, from_date, ccxt_api):
o = OHLCV(
id="{}-{}-{}/{}:{}".format(ccxt_api.name, interval, asset.upper(), stake.upper(), row[0]),
exchange=ccxt_api.name,