Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
}
expected_calendar_offset = {
'1': 0, # Starts on 6-01, 1st trading day of month.
'2': 15, # Starts on 6-22, 16th trading day of month.
'3': 1, # Starts on 6-02, 2nd trading day of month.
'4': 0, # Starts on 6-01, 1st trading day of month.
'5': 9, # Starts on 6-12, 10th trading day of month.
'6': 10, # Starts on 6-15, 11th trading day of month.
}
self.assertEqual(result.attrs['first_row'], expected_first_row)
self.assertEqual(result.attrs['last_row'], expected_last_row)
self.assertEqual(
result.attrs['calendar_offset'],
expected_calendar_offset,
)
cal = get_calendar(result.attrs['calendar_name'])
first_session = Timestamp(result.attrs['start_session_ns'], tz='UTC')
end_session = Timestamp(result.attrs['end_session_ns'], tz='UTC')
sessions = cal.sessions_in_range(first_session, end_session)
assert_index_equal(
self.sessions,
sessions
)
def init_class_fixtures(cls):
super(TestDateUtils, cls).init_class_fixtures()
cls.calendar = get_calendar('NYSE')
def setUpClass(cls):
# On the AfterOpen and BeforeClose tests, we want ensure that the
# functions are pure, and that running them with the same input will
# provide the same output, regardless of whether the function is run 1
# or N times. (For performance reasons, we cache some internal state
# in AfterOpen and BeforeClose, but we don't want it to affect
# purity). Hence, we use the same before_close and after_open across
# subtests.
cls.before_close = BeforeClose(hours=1, minutes=5)
cls.after_open = AfterOpen(hours=1, minutes=5)
cls.class_ = None # Mark that this is the base class.
cal = get_calendar(cls.CALENDAR_STRING)
cls.before_close.cal = cal
cls.after_open.cal = cal
def gen_calendars(start, stop, critical_dates):
"""
Generate calendars to use as inputs.
"""
all_dates = pd.date_range(start, stop, tz='utc')
for to_drop in map(list, powerset(critical_dates)):
# Have to yield tuples.
yield (all_dates.drop(to_drop),)
# Also test with the trading calendar.
trading_days = get_calendar("NYSE").all_days
yield (trading_days[trading_days.slice_indexer(start, stop)],)
environ,
bundle_timestamp,
)
prefix, connstr = re.split(
r'sqlite:///',
str(bundle_data.asset_finder.engine.url),
maxsplit=1,
)
if prefix:
raise ValueError(
"invalid url %r, must begin with 'sqlite:///'" %
str(bundle_data.asset_finder.engine.url),
)
open_calendar = get_calendar('OPEN')
env = TradingEnvironment(
load=partial(load_crypto_market_data, environ=environ),
bm_symbol='USDT_BTC',
trading_calendar=open_calendar,
asset_db_path=connstr,
environ=environ,
)
first_trading_day = bundle_data.minute_bar_reader.first_trading_day
data = DataPortal(
env.asset_finder,
open_calendar,
first_trading_day=first_trading_day,
minute_reader=bundle_data.minute_bar_reader,
def sessions(self):
if 'calendar' in self._table.attrs.attrs:
# backwards compatibility with old formats, will remove
return DatetimeIndex(self._table.attrs['calendar'], tz='UTC')
else:
cal = get_calendar(self._table.attrs['calendar_name'])
start_session_ns = self._table.attrs['start_session_ns']
start_session = Timestamp(start_session_ns, tz='UTC')
end_session_ns = self._table.attrs['end_session_ns']
end_session = Timestamp(end_session_ns, tz='UTC')
sessions = cal.sessions_in_range(start_session, end_session)
return sessions
version = raw_data['version']
except KeyError:
# Version was first written with version 1, assume 0,
# if version does not match.
version = 0
default_ohlc_ratio = raw_data['ohlc_ratio']
if version >= 1:
minutes_per_day = raw_data['minutes_per_day']
else:
# version 0 always assumed US equities.
minutes_per_day = US_EQUITIES_MINUTES_PER_DAY
if version >= 2:
calendar = get_calendar(raw_data['calendar_name'])
start_session = pd.Timestamp(
raw_data['start_session'], tz='UTC')
end_session = pd.Timestamp(raw_data['end_session'], tz='UTC')
else:
# No calendar info included in older versions, so
# default to NYSE.
calendar = get_calendar('NYSE')
start_session = pd.Timestamp(
raw_data['first_trading_day'], tz='UTC')
end_session = calendar.minute_to_session_label(
pd.Timestamp(
raw_data['market_closes'][-1], unit='m', tz='UTC')
)
if version >= 3:
warnings.warn(
'passing metadata to TradingAlgorithm is deprecated; please'
' write this data into the asset db before passing it to the'
' trading environment',
DeprecationWarning,
stacklevel=1,
)
self.trading_environment.write_data(
equities=kwargs.pop('equities_metadata', None),
futures=kwargs.pop('futures_metadata', None),
)
# If a schedule has been provided, pop it. Otherwise, use NYSE.
self.trading_calendar = kwargs.pop(
'trading_calendar',
get_calendar('OPEN')
)
self.sim_params = kwargs.pop('sim_params', None)
if self.sim_params is None:
self.sim_params = create_simulation_parameters(
start=kwargs.pop('start', None),
end=kwargs.pop('end', None),
trading_calendar=self.trading_calendar,
)
self.perf_tracker = None
# Pull in the environment's new AssetFinder for quick reference
self.asset_finder = self.trading_environment.asset_finder
# Initialize Pipeline API data.
self.init_engine(
exchanges = dict()
for name in exchange_list:
if auth_aliases is not None and name in auth_aliases:
auth_alias = auth_aliases[name]
else:
auth_alias = None
exchanges[name] = get_exchange(
exchange_name=name,
quote_currency=quote_currency,
must_authenticate=(live and not simulate_orders),
skip_init=True,
auth_alias=auth_alias,
)
open_calendar = get_calendar('OPEN')
env = TradingEnvironment(
load=partial(
load_crypto_market_data,
environ=environ,
start_dt=start,
end_dt=end
),
environ=environ,
exchange_tz='UTC',
asset_db_path=None # We don't need an asset db, we have exchanges
)
env.asset_finder = ExchangeAssetFinder(exchanges=exchanges)
def choose_loader(column):
bound_cols = TradingPairPricing.columns
if version >= 1:
minutes_per_day = raw_data['minutes_per_day']
else:
# version 0 always assumed US equities.
minutes_per_day = US_EQUITIES_MINUTES_PER_DAY
if version >= 2:
calendar = get_calendar(raw_data['calendar_name'])
start_session = pd.Timestamp(
raw_data['start_session'], tz='UTC')
end_session = pd.Timestamp(raw_data['end_session'], tz='UTC')
else:
# No calendar info included in older versions, so
# default to NYSE.
calendar = get_calendar('NYSE')
start_session = pd.Timestamp(
raw_data['first_trading_day'], tz='UTC')
end_session = calendar.minute_to_session_label(
pd.Timestamp(
raw_data['market_closes'][-1], unit='m', tz='UTC')
)
if version >= 3:
ohlc_ratios_per_sid = raw_data['ohlc_ratios_per_sid']
if ohlc_ratios_per_sid is not None:
ohlc_ratios_per_sid = keymap(int, ohlc_ratios_per_sid)
else:
ohlc_ratios_per_sid = None
return cls(