Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@parameter_space(
rule_offset=(0, 1, 2, 3, 4),
start_offset=(0, 1, 2, 3, 4),
type=('week_start', 'week_end')
)
def test_edge_cases_for_TradingDayOfWeek(self,
rule_offset,
start_offset,
type):
"""
Test that we account for midweek holidays. Monday 01/20 is a holiday.
Ensure that the trigger date for that week is adjusted
appropriately, or thrown out if not enough trading days. Also, test
that if we start the simulation on a day where we miss the trigger
for that week, that the trigger is recalculated for next week.
"""
@parameter_space(seed=range(5))
def test_MACD_window_length_generation(self, seed):
rng = RandomState(seed)
signal_period = rng.randint(1, 90)
fast_period = rng.randint(signal_period + 1, signal_period + 100)
slow_period = rng.randint(fast_period + 1, fast_period + 100)
ewma = MovingAverageConvergenceDivergenceSignal(
fast_period=fast_period,
slow_period=slow_period,
signal_period=signal_period,
)
assert_equal(
ewma.window_length,
slow_period + signal_period - 1,
)
@parameter_space(x=x_args, y=y_args)
def test_yx(self, y, x):
# Ensure that product is called with args in the order that they appear
# in the function's parameter list.
self.yx_invocations.append((y, x))
@parameter_space(returns_length=[2, 3], regression_length=[3, 4])
def test_factor_regression_method(self, returns_length, regression_length):
"""
Ensure that `Factor.linear_regression` is consistent with the built-in
factor `RollingLinearRegressionOfReturns`.
"""
my_asset = self.my_asset
start_date = self.pipeline_start_date
end_date = self.pipeline_end_date
run_pipeline = self.run_pipeline
returns = Returns(window_length=returns_length, inputs=[self.col])
returns_slice = returns[my_asset]
regression = returns.linear_regression(
target=returns_slice, regression_length=regression_length,
)
@parameter_space(my_asset_column=[0, 1, 2], window_length_=[1, 2, 3])
def test_slice(self, my_asset_column, window_length_):
"""
Test that slices can be created by indexing into a term, and that they
have the correct shape when used as inputs.
"""
sids = self.sids
my_asset = self.asset_finder.retrieve_asset(self.sids[my_asset_column])
returns = Returns(window_length=2, inputs=[self.col])
returns_slice = returns[my_asset]
class UsesSlicedInput(CustomFactor):
window_length = window_length_
inputs = [returns, returns_slice]
def compute(self, today, assets, out, returns, returns_slice):
@parameter_space(metadata={'deltas', 'checkpoints'})
def test_from_blaze_mixed_resources_metadata_expr(self, metadata):
expr = bz.symbol('expr', self.dshape)
metadata_expr = bz.data(self.df, name=metadata, dshape=self.dshape)
with self.assertRaises(ValueError) as e:
from_blaze(
expr,
resources={metadata_expr: self.df},
loader=self.garbage_loader,
no_deltas_rule='ignore',
no_checkpoints_rule='ignore',
missing_values=self.missing_values,
**{metadata: metadata_expr}
)
assert_equal(
str(e.exception),
@parameter_space(
seed_value=range(1, 2),
normalizer_name_and_func=[
('demean', {}, lambda row: row - nanmean(row)),
('zscore', {}, lambda row: (row - nanmean(row)) / nanstd(row)),
(
'winsorize',
{"min_percentile": 0.25, "max_percentile": 0.75},
lambda row: scipy_winsorize(
row,
limits=0.25,
)
),
],
add_nulls_to_factor=(False, True,),
)
def _test_normalizations_randomized(self,
@parameter_space(
dtype_=[categorical_dtype, int64_dtype],
outputs_=[('a',), ('a', 'b')],
)
def test_reject_multi_output_classifiers(self, dtype_, outputs_):
"""
Multi-output CustomClassifiers don't work because they use special
output allocation for string arrays.
"""
class SomeClassifier(CustomClassifier):
dtype = dtype_
window_length = 5
inputs = [SomeDataSet.foo, SomeDataSet.bar]
outputs = outputs_
missing_value = dtype_.type('123')
@parameter_space(
calendar_name=TRADING_CALENDAR_STRS,
base_terms=[
(factor1, factor11, factor91),
(filter1, filter11, filter91),
(classifier1, classifier11, classifier91),
],
__fail_fast=True
)
def test_yearly(self, base_terms, calendar_name):
downsampled_terms = tuple(
t.downsample('year_start') for t in base_terms
)
all_terms = base_terms + downsampled_terms
all_sessions = self.trading_sessions[calendar_name]
end_session = all_sessions[-1]
@parameter_space(
__fail_fast=True,
f=[
lambda s: str(len(s)),
lambda s: s[0],
lambda s: ''.join(reversed(s)),
lambda s: '',
]
)
def test_map(self, f):
data = np.array(
[['E', 'GHIJ', 'HIJKLMNOP', 'DEFGHIJ'],
['CDE', 'ABCDEFGHIJKLMNOPQ', 'DEFGHIJKLMNOPQRS', 'ABCDEFGHIJK'],
['DEFGHIJKLMNOPQR', 'DEFGHI', 'DEFGHIJ', 'FGHIJK'],
['EFGHIJKLM', 'EFGHIJKLMNOPQRS', 'ABCDEFGHI', 'DEFGHIJ']],
dtype=object,
)