Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_bitemporal_store_read_as_of_timezone(bitemporal_library):
bitemporal_library.update('spam', ts1, as_of=dt(2015, 5, 1, tzinfo=mktz('Europe/London')))
bitemporal_library.update('spam', read_str_as_pandas(""" sample_dt | near
2012-12-01 17:06:11.040 | 25"""),
as_of=dt(2015, 5, 2, tzinfo=mktz('Europe/London')))
df = bitemporal_library.read('spam', as_of=dt(2015, 5, 2, tzinfo=mktz('Asia/Hong_Kong'))).data
assert_frame_equal(df, ts1)
def test_list_version(library, fw_pointers_cfg):
with FwPointersCtx(fw_pointers_cfg):
assert len(list(library.list_versions(symbol))) == 0
dates = [None, None, None]
now = dt.utcnow().replace(tzinfo=mktz('UTC'))
for x in six.moves.xrange(len(dates)):
dates[x] = now - dtd(minutes=130 - x)
with patch("bson.ObjectId", return_value=bson.ObjectId.from_datetime(dates[x])):
library.write(symbol, ts1, prune_previous_version=False)
assert len(list(library.list_versions(symbol))) == 3
library.write(symbol, ts1, prune_previous_version=True)
assert len(list(library.list_versions(symbol))) >= 2
versions = list(library.list_versions(symbol))
for i, x in enumerate([4, 3]):
assert versions[i]['symbol'] == symbol
assert versions[i]['date'] >= dates[i]
assert versions[i]['version'] == x
def test_bitemporal_store_read_as_of_timezone(bitemporal_library):
bitemporal_library.update('spam', ts1, as_of=dt(2015, 5, 1, tzinfo=mktz('Europe/London')))
bitemporal_library.update('spam', read_str_as_pandas(""" sample_dt | near
2012-12-01 17:06:11.040 | 25"""),
as_of=dt(2015, 5, 2, tzinfo=mktz('Europe/London')))
df = bitemporal_library.read('spam', as_of=dt(2015, 5, 2, tzinfo=mktz('Asia/Hong_Kong'))).data
assert_frame_equal(df, ts1)
def test_read_ts_raw_all_version_ok(bitemporal_library):
bitemporal_library.update('spam', ts1, as_of=dt(2015, 5, 1, tzinfo=mktz('UTC')))
bitemporal_library.update('spam', read_str_as_pandas(""" sample_dt | near
2012-12-01 17:06:11.040 | 25"""),
as_of=dt(2015, 5, 5, tzinfo=mktz('UTC')))
bitemporal_library.update('spam', read_str_as_pandas(""" sample_dt | near
2012-11-08 17:06:11.040 | 42"""),
as_of=dt(2015, 5, 3, tzinfo=mktz('UTC')))
bitemporal_library.update('spam', read_str_as_pandas(""" sample_dt | near
2012-10-08 17:06:11.040 | 42
2013-01-01 17:06:11.040 | 100"""),
as_of=dt(2015, 5, 10, tzinfo=mktz('UTC')))
assert_frame_equal(bitemporal_library.read('spam', raw=True).data.tz_localize(tz=None, level=1), read_str_as_pandas(
""" sample_dt | observed_dt | near
2012-09-08 17:06:11.040 | 2015-05-01 | 1.0
2012-10-08 17:06:11.040 | 2015-05-01 | 2.0
2012-10-08 17:06:11.040 | 2015-05-10 | 42
2012-10-09 17:06:11.040 | 2015-05-01 | 2.5
2012-11-08 17:06:11.040 | 2015-05-01 | 3.0
2012-11-08 17:06:11.040 | 2015-05-03 | 42
2012-12-01 17:06:11.040 | 2015-05-05 | 25
def test_tickstore_to_bucket_always_forwards():
symbol = 'SYM'
tz = 'UTC'
initial_image = {'index': dt(2014, 1, 1, 0, 0, tzinfo=mktz(tz)), 'A': 123, 'B': 54.4, 'C': 'DESC'}
data = [{'index': dt(2014, 2, 1, 0, 1, tzinfo=mktz(tz)), 'A': 124, 'D': 0},
{'index': dt(2014, 1, 1, 0, 1, tzinfo=mktz(tz)), 'A': 125, 'B': 27.2}]
with pytest.raises(UnorderedDataException):
TickStore._to_bucket(data, symbol, initial_image)
def utc_dt_to_local_dt(dtm):
"""Convert a UTC datetime to datetime in local timezone"""
utc_zone = mktz("UTC")
if dtm.tzinfo is not None and dtm.tzinfo != utc_zone:
raise ValueError(
"Expected dtm without tzinfo or with UTC, not %r" % (
dtm.tzinfo
)
)
if dtm.tzinfo is None:
dtm = dtm.replace(tzinfo=utc_zone)
return dtm.astimezone(mktz())
def ms_to_datetime(ms, tzinfo=None):
"""Convert a millisecond time value to an offset-aware Python datetime object."""
if not isinstance(ms, (int, long)):
raise TypeError('expected integer, not %s' % type(ms))
if tzinfo is None:
tzinfo = mktz()
return datetime.datetime.fromtimestamp(ms * 1e-3, tzinfo)
Parameters
----------
symbol : `str`
symbol name for the item
data : `pd.DataFrame`
to be persisted
metadata : `dict`
An optional dictionary of metadata to persist along with the symbol. If None and there are existing
metadata, current metadata will be maintained
upsert : `bool`
Write 'data' if no previous version exists.
as_of : `datetime.datetime`
The "insert time". Default to datetime.now()
"""
local_tz = mktz()
if not as_of:
as_of = dt.now()
if as_of.tzinfo is None:
as_of = as_of.replace(tzinfo=local_tz)
data = self._add_observe_dt_index(data, as_of)
if upsert and not self._store.has_symbol(symbol):
df = data
else:
existing_item = self._store.read(symbol, **kwargs)
if metadata is None:
metadata = existing_item.metadata
df = existing_item.data.append(data).sort_index(kind='mergesort')
self._store.write(symbol, df, metadata=metadata, prune_previous_version=True)