Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
coll = library._collection
# Delete version 1 (ts1)
library._delete_version(symbol, 1)
assert_frame_equal(library.read(symbol, as_of=2).data, ts2)
assert_frame_equal(library.read(symbol, as_of=3).data, ts1)
library._delete_version(symbol, 2)
assert_frame_equal(library.read(symbol, as_of=3).data, ts1)
assert_frame_equal(library.read(symbol, as_of=4).data, ts2)
library._delete_version(symbol, 3)
assert_frame_equal(library.read(symbol).data, ts2)
library._delete_version(symbol, 4)
assert mongo_count(coll) == 0
def test_store_item_new_version(library, library_name):
with patch('pymongo.message.query', side_effect=_query(False, library_name)), \
patch('pymongo.server_description.ServerDescription.server_type', SERVER_TYPE.Mongos):
library.write(symbol, ts1)
coll = library._collection
count = mongo_count(coll)
assert mongo_count(coll.versions) == 1
# No change to the TS
library.write(symbol, ts1, prune_previous_version=False)
assert mongo_count(coll) == count
assert mongo_count(coll.versions) == 2
# Store the first timeseries
none = datetime.now()
time.sleep(1)
library.write(symbol, ts1)
original = datetime.now()
# Assertions:
assert mongo_count(coll.versions) == 1
assert_frame_equal(library.read(symbol).data, ts1)
# Update the TimeSeries
time.sleep(1)
library.write(symbol, ts2, prune_previous_version=False)
recent = datetime.now()
assert mongo_count(coll.versions) == 2
assert_frame_equal(library.read(symbol).data, ts2)
# Get the different versions of the DB
with pytest.raises(NoDataFoundException):
library.read(symbol, as_of=none)
assert_frame_equal(library.read(symbol, as_of=original).data, ts1)
assert_frame_equal(library.read(symbol, as_of=recent).data, ts2)
# Now push back in the original version
time.sleep(1)
library.write(symbol, ts1, prune_previous_version=False)
assert mongo_count(coll.versions) == 3
assert_frame_equal(library.read(symbol).data, ts1)
# Get the different versions of the DB
def test_size_chunk_multiple_update(chunkstore_lib):
df_large = DataFrame(data={'data': np.random.randint(0, 100, size=5500000),
'date': [dt(2015, 1, 1)] * 5500000})
df_small = DataFrame(data={'data': np.random.randint(0, 100, size=100),
'date': [dt(2016, 1, 1)] * 100})
chunkstore_lib.update('test_df', df_large, upsert=True)
chunkstore_lib.update('test_df', df_small, upsert=True)
read_df = chunkstore_lib.read('test_df')
expected = pd.concat([df_large, df_small]).reset_index(drop=True)
assert_frame_equal(expected, read_df)
assert mongo_count(chunkstore_lib._collection, filter={'sy': 'test_df'}) == 3
def test_prunes_multiple_versions_ts(library, fw_pointers_cfg):
with FwPointersCtx(fw_pointers_cfg):
coll = library._collection
a = ts1
c = ts2
# Create an ObjectId
now = dt.utcnow()
with patch("bson.ObjectId", return_value=bson.ObjectId.from_datetime(now - dtd(minutes=125))):
library.write(symbol, a, prune_previous_version=False)
with patch("bson.ObjectId", return_value=bson.ObjectId.from_datetime(now - dtd(minutes=122))):
library.write(symbol, c, prune_previous_version=False)
with patch("bson.ObjectId", return_value=bson.ObjectId.from_datetime(now - dtd(minutes=121))):
library.write(symbol, a, prune_previous_version=False)
with patch("bson.ObjectId", return_value=bson.ObjectId.from_datetime(now - dtd(minutes=119))):
library.write(symbol, c, prune_previous_version=False)
assert mongo_count(coll.versions) == 4
# Prunes all versions older than the most recent version that's older than 10 mins
library.write(symbol, a, prune_previous_version=True)
assert mongo_count(coll.versions) == 3
assert_frame_equal(library.read(symbol, as_of=3).data, a)
assert_frame_equal(library.read(symbol, as_of=4).data, c)
assert_frame_equal(library.read(symbol, as_of=5).data, a)
if record is None or record.equals(df):
continue
sym[APPEND_COUNT] += len(record) - len(df)
appended += len(record) - len(df)
sym[LEN] += len(record) - len(df)
else:
sym[CHUNK_COUNT] += 1
new_chunks += 1
sym[LEN] += len(record)
data = SER_MAP[sym[SERIALIZER]].serialize(record)
meta = data[METADATA]
chunk_count = int(len(data[DATA]) / MAX_CHUNK_SIZE + 1)
seg_count = mongo_count(self._collection, filter={SYMBOL: symbol, START: start, END: end})
# remove old segments for this chunk in case we now have less
# segments than we did before
if seg_count > chunk_count:
self._collection.delete_many({SYMBOL: symbol,
START: start,
END: end,
SEGMENT: {'$gte': chunk_count}})
for i in xrange(chunk_count):
chunk = {DATA: Binary(data[DATA][i * MAX_CHUNK_SIZE: (i + 1) * MAX_CHUNK_SIZE])}
chunk[SEGMENT] = i
chunk[START] = start
chunk[END] = end
chunk[SYMBOL] = symbol
dates = [chunker.chunk_to_str(start), chunker.chunk_to_str(end), str(chunk[SEGMENT]).encode('ascii')]
sha = self._checksum(dates, data[DATA])
# Inspect all segments, don't limit to v['up_to']. No newer append segments after v should exist.
spec = {'symbol': sym, 'parent': v.get('base_version_id', v['_id'])}
else:
# Only verify segment count for current symbol version, don't check corruptability of future appends.
spec = {'symbol': sym, 'parent': v.get('base_version_id', v['_id']), 'segment': {'$lt': v['up_to']}}
try:
# Not that commands sequence (a) is slower than (b)
# (a) curs = collection.find(spec, {'segment': 1}, sort=[('segment', pymongo.DESCENDING)])
# curs.count()
# curs.next()
# (b) collection.find(spec, {'segment': 1}).count()
# collection.find_one(spec, {'segment': 1}, sort=[('segment', pymongo.DESCENDING)])
if check_count:
total_segments = mongo_count(collection, filter=spec)
# Quick check: compare segment count
if total_segments != v.get('segment_count', 0):
return True # corrupted, don't proceed with fetching from mongo the first hit
# Quick check: Segment counts agree and size is zero
if total_segments == 0:
return False
if check_last_segment:
# Quick check: compare the maximum segment's up_to number. It has to verify the version's up_to.
max_seg = collection.find_one(spec, {'segment': 1}, sort=[('segment', pymongo.DESCENDING)])
max_seg = max_seg['segment'] + 1 if max_seg else 0
if max_seg != v.get('up_to'):
return True # corrupted, last segment and version's up_to don't agree
except OperationFailure as e:
logging.warning("Corruption checks are skipped (sym={}, version={}): {}".format(sym, v['version'], str(e)))
def check_written(collection, symbol, version):
# Currently only called from methods which guarantee 'base_version_id' is not populated.
# Make it nonetheless safe for the general case.
parent_id = version_base_or_id(version)
# Check all the chunks are in place
if version.get(FW_POINTERS_CONFIG_KEY) == FwPointersCfg.DISABLED.name:
spec = {'symbol': symbol, 'parent': parent_id}
else:
spec = {'symbol': symbol, 'sha': {'$in': version[FW_POINTERS_REFS_KEY]}}
seen_chunks = mongo_count(collection, filter=spec)
if seen_chunks != version['segment_count']:
raise pymongo.errors.OperationFailure("Failed to write all the chunks. Saw %s expecting %s. "
"Parent: %s. Segments: %s" %
(seen_chunks, version['segment_count'], parent_id,
list(collection.find(spec, projection={'_id': 1, 'segment': 1}))))
if version.get(FW_POINTERS_CONFIG_KEY) == FwPointersCfg.HYBRID.name and ARCTIC_FORWARD_POINTERS_RECONCILE:
seen_chunks_reverse_pointers = mongo_count(collection, filter={'symbol': symbol, 'parent': parent_id})
if seen_chunks != seen_chunks_reverse_pointers:
raise pymongo.errors.OperationFailure("Failed to reconcile forward pointer chunks ({}). "
"Parent {}. "
"Reverse pointers segments #: {}. "
"Forward pointers segments #: {}.".format(
symbol, parent_id, seen_chunks_reverse_pointers, seen_chunks))
def count(self, filter, **kwargs):
"""
See http://api.mongodb.com/python/current/api/pymongo/collection.html#pymongo.collection.Collection.count
"""
return mongo_count(self._collection, filter=filter, **kwargs)