Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _pandas_to_bucket(df, symbol, initial_image):
rtn = {SYMBOL: symbol, VERSION: CHUNK_VERSION_NUMBER, COLUMNS: {}, COUNT: len(df)}
end = to_dt(df.index[-1].to_pydatetime())
if initial_image :
if 'index' in initial_image:
start = min(to_dt(df.index[0].to_pydatetime()), initial_image['index'])
else:
start = to_dt(df.index[0].to_pydatetime())
image_start = initial_image.get('index', start)
image = {k: v for k, v in initial_image.items() if k != 'index'}
rtn[IMAGE_DOC] = {IMAGE_TIME: image_start, IMAGE: initial_image}
final_image = TickStore._pandas_compute_final_image(df, initial_image, end)
else:
start = to_dt(df.index[0].to_pydatetime())
final_image = {}
rtn[END] = end
rtn[START] = start
logger.warning("NB treating all values as 'exists' - no longer sparse")
rowmask = Binary(lz4_compressHC(np.packbits(np.ones(len(df), dtype='uint8')).tostring()))
index_name = df.index.names[0] or "index"
recs = df.to_records(convert_datetime64=False)
for col in df:
def _to_bucket(ticks, symbol, initial_image):
rtn = {SYMBOL: symbol, VERSION: CHUNK_VERSION_NUMBER, COLUMNS: {}, COUNT: len(ticks)}
data = {}
rowmask = {}
start = to_dt(ticks[0]['index'])
end = to_dt(ticks[-1]['index'])
final_image = copy.copy(initial_image) if initial_image else {}
for i, t in enumerate(ticks):
if initial_image:
final_image.update(t)
for k, v in iteritems(t):
try:
if k != 'index':
rowmask[k][i] = 1
else:
v = TickStore._to_ms(v)
if data[k][-1] > v:
raise UnorderedDataException("Timestamps out-of-order: %s > %s" % (
ms_to_datetime(data[k][-1]), t))
data[k].append(v)
except KeyError:
def _pandas_to_bucket(df, symbol, initial_image):
rtn = {SYMBOL: symbol, VERSION: CHUNK_VERSION_NUMBER, COLUMNS: {}, COUNT: len(df)}
end = to_dt(df.index[-1].to_pydatetime())
if initial_image :
if 'index' in initial_image:
start = min(to_dt(df.index[0].to_pydatetime()), initial_image['index'])
else:
start = to_dt(df.index[0].to_pydatetime())
image_start = initial_image.get('index', start)
image = {k: v for k, v in initial_image.items() if k != 'index'}
rtn[IMAGE_DOC] = {IMAGE_TIME: image_start, IMAGE: initial_image}
final_image = TickStore._pandas_compute_final_image(df, initial_image, end)
else:
start = to_dt(df.index[0].to_pydatetime())
final_image = {}
rtn[END] = end
rtn[START] = start
logger.warning("NB treating all values as 'exists' - no longer sparse")
def _slice(self, data, start, end):
if isinstance(data, list):
dictlist = DictList(data, 'index')
slice_start = bisect.bisect_left(dictlist, to_dt(start, mktz('UTC')))
slice_end = bisect.bisect_right(dictlist, to_dt(end, mktz('UTC')))
return data[slice_start:slice_end]
elif isinstance(data, pd.DataFrame):
return data[start:end]
else:
raise UnhandledDtypeException("Can't persist type %s to tickstore" % type(data))
assumed to be the time of the timestamp of the index
metadata: dict
optional user defined metadata - one per symbol
"""
pandas = False
# Check for overlapping data
if isinstance(data, list):
start = data[0]['index']
end = data[-1]['index']
elif isinstance(data, pd.DataFrame):
start = data.index[0].to_pydatetime()
end = data.index[-1].to_pydatetime()
pandas = True
else:
raise UnhandledDtypeException("Can't persist type %s to tickstore" % type(data))
self._assert_nonoverlapping_data(symbol, to_dt(start), to_dt(end))
if pandas:
buckets = self._pandas_to_buckets(data, symbol, initial_image)
else:
buckets = self._to_buckets(data, symbol, initial_image)
self._write(buckets)
if metadata:
ret = self._metadata.replace_one({SYMBOL: symbol},
{SYMBOL: symbol, META: metadata},
upsert=True)
def _pandas_to_bucket(df, symbol, initial_image):
rtn = {SYMBOL: symbol, VERSION: CHUNK_VERSION_NUMBER, COLUMNS: {}, COUNT: len(df)}
end = to_dt(df.index[-1].to_pydatetime())
if initial_image :
if 'index' in initial_image:
start = min(to_dt(df.index[0].to_pydatetime()), initial_image['index'])
else:
start = to_dt(df.index[0].to_pydatetime())
image_start = initial_image.get('index', start)
image = {k: v for k, v in initial_image.items() if k != 'index'}
rtn[IMAGE_DOC] = {IMAGE_TIME: image_start, IMAGE: initial_image}
final_image = TickStore._pandas_compute_final_image(df, initial_image, end)
else:
start = to_dt(df.index[0].to_pydatetime())
final_image = {}
rtn[END] = end
rtn[START] = start
logger.warning("NB treating all values as 'exists' - no longer sparse")
rowmask = Binary(lz4_compressHC(np.packbits(np.ones(len(df), dtype='uint8')).tostring()))
index_name = df.index.names[0] or "index"
def _to_bucket(ticks, symbol, initial_image):
rtn = {SYMBOL: symbol, VERSION: CHUNK_VERSION_NUMBER, COLUMNS: {}, COUNT: len(ticks)}
data = {}
rowmask = {}
start = to_dt(ticks[0]['index'])
end = to_dt(ticks[-1]['index'])
final_image = copy.copy(initial_image) if initial_image else {}
for i, t in enumerate(ticks):
if initial_image:
final_image.update(t)
for k, v in iteritems(t):
try:
if k != 'index':
rowmask[k][i] = 1
else:
v = TickStore._to_ms(v)
if data[k][-1] > v:
raise UnorderedDataException("Timestamps out-of-order: %s > %s" % (
ms_to_datetime(data[k][-1]), t))
data[k].append(v)
except KeyError:
if k != 'index':
Split the tick data to the underlying collections and write the data to each low
level library.
Args:
symbol (str): the symbol for the timeseries data
data (list of dicts or pandas dataframe): Tick data to write
if a list of dicts is given the list must be in time order and the time must be stored in
an element named 'index' the value of which must be a timezone aware datetime.
For a pandas dataframe the index must be a datetime
"""
# get the full set of date ranges that we have
cursor = self._collection.find()
for res in cursor:
library = self._arctic_lib.arctic[res['library_name']]
dslice = self._slice(data, to_dt(res['start'], mktz('UTC')), to_dt(res['end'], mktz('UTC')))
if len(dslice) != 0:
library.write(symbol, dslice)