Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def initialize(context):
pipeline = Pipeline()
context.vwaps = []
for length in vwaps:
name = vwap_key(length)
factor = VWAP(window_length=length)
context.vwaps.append(factor)
pipeline.add(factor, name=name)
filter_ = (USEquityPricing.close.latest > 300)
pipeline.add(filter_, 'filter')
if set_screen:
pipeline.set_screen(filter_)
attach_pipeline(pipeline, 'test')
def test_single_factor(self):
loader = self.loader
assets = self.assets
engine = SimplePipelineEngine(
lambda column: loader, self.dates, self.asset_finder,
)
result_shape = (num_dates, num_assets) = (5, len(assets))
dates = self.dates[10:10 + num_dates]
factor = RollingSumDifference()
expected_result = -factor.window_length
# Since every asset will pass the screen, these should be equivalent.
pipelines = [
Pipeline(columns={'f': factor}),
Pipeline(
columns={'f': factor},
screen=factor.eq(expected_result),
),
]
for p in pipelines:
result = engine.run_pipeline(p, dates[0], dates[-1])
self.assertEqual(set(result.columns), {'f'})
assert_multi_index_is_product(
self, result.index, dates, assets
)
check_arrays(
result['f'].unstack().values,
full(result_shape, expected_result, dtype=float),
if mask is not NotSpecified:
pipeline.add(mask, 'mask')
results = run_pipeline(pipeline, start_date, end_date)
pearson_results = results['pearson_factor'].unstack()
spearman_results = results['spearman_factor'].unstack()
if mask is not NotSpecified:
mask_results = results['mask'].unstack()
check_arrays(mask_results.values, expected_mask)
# Run a separate pipeline that calculates returns starting
# (correlation_length - 1) days prior to our start date. This is
# because we need (correlation_length - 1) extra days of returns to
# compute our expected correlations.
results = run_pipeline(
Pipeline(columns={'returns': returns}),
dates[start_date_index - (correlation_length - 1)],
dates[end_date_index],
)
returns_results = results['returns'].unstack()
# On each day, calculate the expected correlation coefficients
# between the asset we are interested in and each other asset. Each
# correlation is calculated over `correlation_length` days.
expected_pearson_results = full_like(pearson_results, nan)
expected_spearman_results = full_like(spearman_results, nan)
for day in range(num_days):
todays_returns = returns_results.iloc[
day:day + correlation_length
]
my_asset_returns = todays_returns.iloc[:, my_asset_column]
for asset, other_asset_returns in todays_returns.iteritems():
returns_masked_1.linear_regression(
target=returns_masked_2, regression_length=regression_length,
)
returns_5 = Returns(window_length=5, inputs=[self.col])
returns_10 = Returns(window_length=10, inputs=[self.col])
regression_factor = returns_5.linear_regression(
target=returns_10, regression_length=regression_length,
)
columns = {
output: getattr(regression_factor, output)
for output in outputs
}
pipeline = Pipeline(columns=columns)
results = run_pipeline(pipeline, start_date, end_date)
output_results = {}
expected_output_results = {}
for output in outputs:
output_results[output] = results[output].unstack()
expected_output_results[output] = full_like(
output_results[output], nan,
)
# Run a separate pipeline that calculates returns starting
# (regression_length - 1) days prior to our start date. This is because
# we need (regression_length - 1) extra days of returns to compute our
# expected regressions.
columns = {'returns_5': returns_5, 'returns_10': returns_10}
def test_estimates(self):
dataset = QuartersEstimates(1)
engine = SimplePipelineEngine(
lambda x: self.loader,
self.trading_days,
self.asset_finder,
)
results = engine.run_pipeline(
Pipeline({c.name: c.latest for c in dataset.columns}),
start_date=self.trading_days[1],
end_date=self.trading_days[-2],
)
for sid in self.ASSET_FINDER_EQUITY_SIDS:
sid_estimates = results.xs(sid, level=1)
# Separate assertion for all-null DataFrame to avoid setting
# column dtypes on `all_expected`.
if sid == max(self.ASSET_FINDER_EQUITY_SIDS):
assert_true(sid_estimates.isnull().all().all())
else:
ts_sorted_estimates = self.events[
self.events[SID_FIELD_NAME] == sid
].sort(TS_FIELD_NAME)
q1_knowledge = ts_sorted_estimates[
ts_sorted_estimates[FISCAL_QUARTER_FIELD_NAME] == 1
]
def _test_id_macro(self, df, dshape, expected, finder, add, dates=None):
if dates is None:
dates = self.dates
expr = bz.data(df, name='expr', dshape=dshape)
loader = BlazeLoader()
ds = from_blaze(
expr,
loader=loader,
no_deltas_rule='ignore',
no_checkpoints_rule='ignore',
missing_values=self.missing_values,
)
p = Pipeline()
macro_inputs = []
for column_name in add:
column = getattr(ds, column_name)
macro_inputs.append(column)
with self.assertRaises(UnsupportedPipelineOutput):
# Single column output terms cannot be added to a pipeline.
p.add(column.latest, column_name)
class UsesMacroInputs(CustomFactor):
inputs = macro_inputs
window_length = 1
def compute(self, today, assets, out, *inputs):
e = expected.loc[today]
for i, input_ in enumerate(inputs):
# Each macro input should only have one column.
def test_overwrite(self):
p = Pipeline()
f = SomeFactor()
other_f = SomeOtherFactor()
p.add(f, 'f')
self.assertEqual(p.columns, {'f': f})
with self.assertRaises(KeyError) as e:
p.add(other_f, 'f')
[message] = e.exception.args
self.assertEqual(message, "Column 'f' already exists.")
p.add(other_f, 'f', overwrite=True)
self.assertEqual(p.columns, {'f': other_f})
# June 2014
# Mo Tu We Th Fr Sa Su
# 1
# 2 3 4 5 6 7 8
# 9 10 11 12 13 14 15
# 16 17 18 19 20 21 22
# 23 24 25 26 27 28 29
# 30
all_sessions = self.nyse_sessions
compute_dates = all_sessions[
all_sessions.slice_indexer('2014-06-05', '2015-01-06')
]
start_date, end_date = compute_dates[[0, -1]]
pipe = Pipeline({
'year': term.downsample(frequency='year_start'),
'quarter': term.downsample(frequency='quarter_start'),
'month': term.downsample(frequency='month_start'),
'week': term.downsample(frequency='week_start'),
})
# Raw values for term, computed each day from 2014 to the end of the
# target period.
raw_term_results = self.run_pipeline(
Pipeline({'term': term}),
start_date=pd.Timestamp('2014-01-02', tz='UTC'),
end_date=pd.Timestamp('2015-01-06', tz='UTC'),
)['term'].unstack()
expected_results = {
'year': (raw_term_results
dates_to_test = self.dates[-30:]
constants = {open_: 1, close: 2, volume: 3}
loader = PrecomputedLoader(
constants=constants,
dates=self.dates,
sids=self.asset_ids,
)
engine = SimplePipelineEngine(
lambda column: loader, self.dates, self.asset_finder,
)
sumdiff = RollingSumDifference()
result = engine.run_pipeline(
Pipeline(
columns={
'sumdiff': sumdiff,
'open': open_.latest,
'close': close.latest,
'volume': volume.latest,
},
),
dates_to_test[0],
dates_to_test[-1]
)
self.assertIsNotNone(result)
self.assertEqual(
{'sumdiff', 'open', 'close', 'volume'},
set(result.columns)
)
self.expected_no_mask_result,
)
for mask, expected_mask in zip(masks, expected_mask_results):
regression_factor = RollingLinearRegressionOfReturns(
target=my_asset,
returns_length=returns_length,
regression_length=regression_length,
mask=mask,
)
columns = {
output: getattr(regression_factor, output)
for output in outputs
}
pipeline = Pipeline(columns=columns)
if mask is not NotSpecified:
pipeline.add(mask, 'mask')
results = run_pipeline(pipeline, start_date, end_date)
if mask is not NotSpecified:
mask_results = results['mask'].unstack()
check_arrays(mask_results.values, expected_mask)
output_results = {}
expected_output_results = {}
for output in outputs:
output_results[output] = results[output].unstack()
expected_output_results[output] = full_like(
output_results[output], nan,
)