Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def compute_interval(dataframe: DataFrame, exchange_interval=False):
"""
calculates the interval of the given dataframe for us
:param dataframe:
:param exchange_interval: should we convert the result to an exchange interval or just a number
:return:
"""
res_interval = int((dataframe['date'] - dataframe['date'].shift()).min().total_seconds() // 60)
if exchange_interval:
# convert to our allowed ticker values
from technical.exchange import TICKER_INTERVAL_MINUTES
converted = list(TICKER_INTERVAL_MINUTES.keys())[
list(TICKER_INTERVAL_MINUTES.values()).index(exchange_interval)]
if len(converted) > 0:
return converted
else:
raise Exception(
f"sorry, your interval of {res_interval} is not "
f"supported in {TICKER_INTERVAL_MINUTES}")
return res_interval
def compute_interval(dataframe: DataFrame, exchange_interval=False):
"""
calculates the interval of the given dataframe for us
:param dataframe:
:param exchange_interval: should we convert the result to an exchange interval or just a number
:return:
"""
res_interval = int((dataframe['date'] - dataframe['date'].shift()).min().total_seconds() // 60)
if exchange_interval:
# convert to our allowed ticker values
from technical.exchange import TICKER_INTERVAL_MINUTES
converted = list(TICKER_INTERVAL_MINUTES.keys())[
list(TICKER_INTERVAL_MINUTES.values()).index(exchange_interval)]
if len(converted) > 0:
return converted
else:
raise Exception(
f"sorry, your interval of {res_interval} is not "
f"supported in {TICKER_INTERVAL_MINUTES}")
return res_interval
:param dataframe:
:param exchange_interval: should we convert the result to an exchange interval or just a number
:return:
"""
res_interval = int((dataframe['date'] - dataframe['date'].shift()).min().total_seconds() // 60)
if exchange_interval:
# convert to our allowed ticker values
from technical.exchange import TICKER_INTERVAL_MINUTES
converted = list(TICKER_INTERVAL_MINUTES.keys())[
list(TICKER_INTERVAL_MINUTES.values()).index(exchange_interval)]
if len(converted) > 0:
return converted
else:
raise Exception(
f"sorry, your interval of {res_interval} is not "
f"supported in {TICKER_INTERVAL_MINUTES}")
return res_interval
def test_resampled_merge(testdata_1m_btc):
resampled = resample_to_interval(testdata_1m_btc, 5)
merged = resampled_merge(testdata_1m_btc, resampled)
assert (len(merged) == len(testdata_1m_btc))
assert "resample_5_open" in merged
assert "resample_5_close" in merged
assert "resample_5_low" in merged
assert "resample_5_high" in merged
assert "resample_5_date" not in merged
assert "resample_5_volume" not in merged
# Verify the assignment goes to the correct candle
# If resampling to 5m, then the resampled value needs to be on the 5m candle.
assert sum(merged.loc[merged['date'] == '2017-11-14 22:54:00', 'resample_5_close'].isna()) == 1
assert sum(merged.loc[merged['date'] == '2017-11-14 22:55:00', 'resample_5_close'].isna()) == 0
assert sum(merged.loc[merged['date'] == '2017-11-14 22:56:00', 'resample_5_close'].isna()) == 1
def test_resampled_merge_contains_indicator(testdata_1m_btc):
resampled = resample_to_interval(testdata_1m_btc, 5)
resampled['cmf'] = chaikin_money_flow(resampled, 5)
merged = resampled_merge(testdata_1m_btc, resampled)
print(merged)
assert "resample_5_cmf" in merged
def test_resample_to_interval(testdata_1m_btc):
result = resample_to_interval(testdata_1m_btc, 5)
# should be roughly a factor 5
assert (len(testdata_1m_btc) / len(result) > 4.5)
assert (len(testdata_1m_btc) / len(result) < 5.5)
def test_historical_data():
days = datetime.datetime.today() - datetime.timedelta(days=7)
print(days)
data = historical_data(
"USDT", "BNB", "1d", days.timestamp())
assert len(data) == 7
def test_historical_data_ploniex_long():
""" this one is awesome since you can download years worth of data"""
days = datetime.datetime.today() - datetime.timedelta(days=365)
data = historical_data(
"BTC", "ETC", "1d", days.timestamp(), "poloniex")
assert len(data) == 365
def test_historical_data_ploniex():
""" this one is awesome since you can download years worth of data"""
days = datetime.datetime.today() - datetime.timedelta(days=90)
data = historical_data(
"BTC", "ETH", "1d", days.timestamp(), "poloniex")
assert len(data) == 90
def test_resampled_merge(testdata_1m_btc):
resampled = resample_to_interval(testdata_1m_btc, 5)
merged = resampled_merge(testdata_1m_btc, resampled)
assert (len(merged) == len(testdata_1m_btc))
assert "resample_5_open" in merged
assert "resample_5_close" in merged
assert "resample_5_low" in merged
assert "resample_5_high" in merged
assert "resample_5_date" not in merged
assert "resample_5_volume" not in merged
# Verify the assignment goes to the correct candle
# If resampling to 5m, then the resampled value needs to be on the 5m candle.
assert sum(merged.loc[merged['date'] == '2017-11-14 22:54:00', 'resample_5_close'].isna()) == 1
assert sum(merged.loc[merged['date'] == '2017-11-14 22:55:00', 'resample_5_close'].isna()) == 0
assert sum(merged.loc[merged['date'] == '2017-11-14 22:56:00', 'resample_5_close'].isna()) == 1