Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_delete_version_version_not_found():
with patch('arctic.store.version_store.VersionStore.__init__', return_value=None, autospec=True):
with patch('arctic.store.version_store.logger') as logger:
vs = version_store.VersionStore(sentinel.connection)
vs._versions = MagicMock()
with patch.object(vs._versions, 'find_one', return_value=None, autospec=True):
vs._delete_version(sentinel.symbol, sentinel.version)
logger.error.assert_called_once_with("Can't delete sentinel.symbol:sentinel.version as not found in DB")
def _create_mock_versionstore():
vs = create_autospec(VersionStore, _arctic_lib=Mock(), _version_nums=Mock(), _versions=Mock())
vs._insert_version = lambda version: VersionStore._insert_version(vs, version)
vs._arctic_lib.get_name.return_value = TEST_LIB
vs._read_metadata.return_value = TPL_VERSION
vs._version_nums.find_one_and_update.return_value = {'version': TPL_VERSION['version'] + 1}
vs._version_nums.find_one.return_value = {'version': TPL_VERSION['version'] + 1}
vs._versions.find_one.return_value = TPL_VERSION
vs._add_new_version_using_reference.side_effect = lambda *args: VersionStore._add_new_version_using_reference(vs, *args)
vs._last_version_seqnum = lambda version: VersionStore._last_version_seqnum(vs, version)
vs.write.return_value = VersionedItem(symbol=TEST_SYMBOL, library=vs._arctic_lib.get_name(),
version=TPL_VERSION['version'] + 1, metadata=META_TO_WRITE, data=None,
host=vs._arctic_lib.arctic.mongo_host)
return vs
vs._last_version_seqnum = lambda version: VersionStore._last_version_seqnum(vs, version)
vs.write.return_value = VersionedItem(symbol=TEST_SYMBOL, library=vs._arctic_lib.get_name(),
def test_library(library):
assert isinstance(library, VersionStore)
assert library._arctic_lib.get_library_type() == 'VersionStore'
def test__read_preference__default_false():
self = create_autospec(VersionStore, _allow_secondary=False)
assert VersionStore._read_preference(self, None) == ReadPreference.PRIMARY
vs._insert_version = lambda version: VersionStore._insert_version(vs, version)
vs._arctic_lib.get_name.return_value = TEST_LIB
import datetime as dt
import logging
from typing import Any, Dict, Union
import pandas as pd
from arctic.date import DateRange
from arctic.decorators import mongo_retry
from arctic.exceptions import DuplicateSnapshotException
from arctic.store.version_store import VersionStore
from arctic.store.versioned_item import VersionedItem
class PortfolioStore(VersionStore):
"""
Wrapper for the :class:``arctic.store.version_store.VersionStore`` to
persist portfolio data.
"""
LIBRARY_TYPE = 'pytech.Portfolio'
LIBRARY_NAME = 'pytech.portfolio'
def __init__(self, arctic_lib):
self.logger = logging.getLogger(__name__)
super().__init__(arctic_lib)
self.logger.info('PortfolioStore collection name: '
f'{arctic_lib.get_name()}')
@mongo_retry
def read(self, symbol: str,
def initialize_library(cls, arctic_lib, hashed=True, **kwargs):
c = arctic_lib.get_top_level_collection()
if 'strict_write_handler' in kwargs:
arctic_lib.set_library_metadata('STRICT_WRITE_HANDLER_MATCH',
bool(kwargs.pop('strict_write_handler')))
for th in _TYPE_HANDLERS:
th.initialize_library(arctic_lib, **kwargs)
VersionStore._bson_handler.initialize_library(arctic_lib, **kwargs)
VersionStore(arctic_lib)._ensure_index()
logger.info("Trying to enable sharding...")
try:
enable_sharding(arctic_lib.arctic, arctic_lib.get_name(), hashed=hashed)
except OperationFailure as e:
logger.warning("Library created, but couldn't enable sharding: %s. This is OK if you're not 'admin'" % str(e))
def initialize_library(cls, arctic_lib, hashed=True, **kwargs):
c = arctic_lib.get_top_level_collection()
if 'strict_write_handler' in kwargs:
arctic_lib.set_library_metadata('STRICT_WRITE_HANDLER_MATCH',
bool(kwargs.pop('strict_write_handler')))
for th in _TYPE_HANDLERS:
th.initialize_library(arctic_lib, **kwargs)
VersionStore._bson_handler.initialize_library(arctic_lib, **kwargs)
VersionStore(arctic_lib)._ensure_index()
logger.info("Trying to enable sharding...")
try:
enable_sharding(arctic_lib.arctic, arctic_lib.get_name(), hashed=hashed)
except OperationFailure as e:
logger.warning("Library created, but couldn't enable sharding: %s. This is OK if you're not 'admin'" % str(e))
def __setstate__(self, state):
return VersionStore.__init__(self, state['arctic_lib'])