Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def get_symbol_prices(self, symbol, time_frame, limit=None, return_list=True):
self._ensure_available_data(symbol)
try:
candles = self._extract_data_with_limit(symbol, time_frame)
if time_frame is not None:
self.time_frame_get_times[symbol][time_frame.value] += 1
# if it's at least the second iteration: only use the last candle, otherwise use all
if self.time_frame_get_times[symbol][time_frame.value] > 1:
candles = candles[-1]
self.get_symbol_data(symbol).update_symbol_candles(time_frame, candles)
except BacktestingEndedException as e:
# increment candles to compute profitability on the last one at the end of the simulation
self.time_frame_get_times[symbol][time_frame.value] += 1
raise e
def _extract_from_indexes(array, max_index, symbol, factor=1):
# always stop before the last candle to be able to read data from it during the n-1 evaluation
max_limit = len(array) - 1
max_index *= factor
if max_index > max_limit:
raise BacktestingEndedException(symbol)
else:
return array[:max_index]
async def _refresh_backtesting_time_frame_data(self, time_frame, symbol, producer: OHLCVUpdaterProducer):
try:
if self.exchange.get_exchange().should_update_data(time_frame, symbol):
await producer.send(True) # TODO
except BacktestingEndedException as e:
self.logger.info(e)
self.keep_running = False
await self.exchange.get_exchange().end_backtesting(symbol)
async def _refresh_backtesting_time_frame_data(self, time_frame, symbol):
try:
if self.exchange.get_exchange().should_update_data(time_frame, symbol):
await self._refresh_data(time_frame, symbol)
except BacktestingEndedException as e:
self.logger.info(e)
self.keep_running = False
await self.exchange.get_exchange().end_backtesting(symbol)