Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_impacted_price_worse_than_limit(self):
model = VolatilityVolumeShare(volume_limit=0.05)
# Use all the same numbers from the 'calculate_impact' tests. Since the
# impacted price is 59805.5, which is worse than the limit price of
# 59800, the model should return None.
minute = pd.Timestamp('2006-03-01 11:35AM', tz='UTC')
data = self.create_bardata(simulation_dt_func=lambda: minute)
order = Order(
dt=data.current_dt, asset=self.ASSET, amount=10, limit=59800,
)
price, amount = model.process_order(data, order)
self.assertIsNone(price)
self.assertIsNone(amount)
def test_orders_limit(self):
slippage_model = VolumeShareSlippage()
slippage_model.data_portal = self.data_portal
# long, does not trade
open_orders = [
Order(**{
'dt': datetime.datetime(2006, 1, 5, 14, 30, tzinfo=pytz.utc),
'amount': 100,
'filled': 0,
'asset': self.ASSET133,
'limit': 3.5})
]
bar_data = self.create_bardata(
simulation_dt_func=lambda: self.minutes[3],
)
orders_txns = list(slippage_model.simulate(
bar_data,
self.ASSET133,
open_orders,
))
last_col=0,
value=first_split_ratio,
)],
3: [Float64Multiply(
first_row=0,
last_row=3,
first_col=1,
last_col=1,
value=second_split_ratio,
)],
},
msg=column,
)
# check the volume, the value should be 1/ratio
assert_equal(
adjustments_for_cols[-1],
{
2: [Float64Multiply(
first_row=0,
last_row=2,
first_col=0,
last_col=0,
value=1 / first_split_ratio,
)],
3: [Float64Multiply(
first_row=0,
last_row=3,
first_col=1,
last_col=1,
value=1 / second_split_ratio,
)],
catalyst_root = self.enter_instance_context(tmp_dir()).path
environ = {
'ZIPLINE_ROOT': catalyst_root,
'QUANDL_API_KEY': self.api_key,
}
with patch_read_csv(url_map, strict=True):
ingest('quandl', environ=environ)
bundle = load('quandl', environ=environ)
sids = 0, 1, 2, 3
assert_equal(set(bundle.asset_finder.sids), set(sids))
for equity in bundle.asset_finder.retrieve_all(sids):
assert_equal(equity.start_date, self.asset_start, msg=equity)
assert_equal(equity.end_date, self.asset_end, msg=equity)
sessions = self.calendar.all_sessions
actual = bundle.equity_daily_bar_reader.load_raw_arrays(
self.columns,
sessions[sessions.get_loc(self.asset_start, 'bfill')],
sessions[sessions.get_loc(self.asset_end, 'ffill')],
sids,
)
expected_pricing, expected_adjustments = self._expected_data(
bundle.asset_finder,
)
assert_equal(actual, expected_pricing, array_decimal=2)
adjustments_for_cols = bundle.adjustment_reader.load_adjustments(
self.columns,
sessions,
pipe = Pipeline(
columns={
'sid': SidFactor(),
'evens': evens,
'odds': odds,
'first_five': first_five,
'last_three': last_three,
},
)
start, end = self.trading_days[[-10, -1]]
results = self.run_pipeline(pipe, start, end).unstack()
sids = results.sid.astype(int64_dtype)
assert_equal(results.evens, ~(sids % 2).astype(bool))
assert_equal(results.odds, (sids % 2).astype(bool))
assert_equal(results.first_five, sids < 5)
assert_equal(results.last_three, sids >= 7)
e1, e2 = relevant_events['event_date']
t1, t2 = relevant_events['timestamp']
for date, computed_value in zip(dates, asset_result):
if t1 <= date <= e1:
# If we've seen event 2, it should win even if we've seen
# event 1, because events are sorted by event_date.
self.assertEqual(computed_value, v1)
elif t2 <= date <= e2:
# If we've seen event 1 but not event 2, event 1 should
# win.
self.assertEqual(computed_value, v2)
else:
# If we haven't seen either event, then we should have
# column.missing_value.
assert_equal(
computed_value,
column.missing_value,
# Coerce from Timestamp to datetime64.
allow_datetime_coercions=True,
)
assert_equal(
self._list_bundle(),
{fourth, fifth},
msg='keep_last=2 did not remove the correct number of ingestions',
)
with assert_raises(BadClean):
self.clean('bundle', keep_last=-1, environ=self.environ)
assert_equal(
self._list_bundle(),
{fourth, fifth},
msg='keep_last=-1 removed some ingestions',
)
assert_equal(
self.clean('bundle', keep_last=0, environ=self.environ),
{fourth, fifth},
)
assert_equal(
self._list_bundle(),
set(),
msg='keep_last=0 did not remove the correct number of ingestions',
)
checkpoints=checkpoints,
loader=self.garbage_loader,
missing_values=self.missing_values,
)
with self.assertRaises(TypeError) as e:
# test that we cannot create a single column from a non field
# even with explicit metadata
from_blaze(
expr.value + 1,
deltas=deltas,
checkpoints=checkpoints,
loader=self.garbage_loader,
missing_values=self.missing_values,
)
assert_equal(
str(e.exception),
"expression 'expr.value + 1' was array-like but not a simple field"
" of some larger table",
inputs = ()
window_length = 0
dtype = categorical_dtype
missing_value = ''
f = F()
column_data = LabelArray(
np.array(
[['a', f.missing_value],
['b', f.missing_value],
['c', 'd']],
),
missing_value=f.missing_value,
)
assert_equal(
f.postprocess(column_data.ravel()),
pd.Categorical(
['a', f.missing_value, 'b', f.missing_value, 'c', 'd'],
),
)
# only include the non-missing data
pipeline_output = pd.Series(
data=['a', 'b', 'c', 'd'],
index=pd.MultiIndex.from_arrays([
[pd.Timestamp('2014-01-01'),
pd.Timestamp('2014-01-02'),
pd.Timestamp('2014-01-03'),
pd.Timestamp('2014-01-03')],
[0, 0, 0, 1],
]),
def test_wma1(self):
wma1 = LinearWeightedMovingAverage(
inputs=(USEquityPricing.close,),
window_length=10
)
today = pd.Timestamp('2015')
assets = np.arange(5, dtype=np.int64)
data = np.ones((10, 5))
out = np.zeros(data.shape[1])
wma1.compute(today, assets, out, data)
assert_equal(out, np.ones(5))