Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_insert(self):
from datetime import datetime, timedelta
mdb = Metadatabase('sqlite://', create_tables=True)
# mdb.create_tables()
s = mdb.session()
from uuid import uuid1
uu = uuid1()
when = datetime.utcnow()
nextwhen = when + timedelta(minutes=5)
f = Resource(path='/path/to/foo.bar', mtime=when, atime=when, format=None)
p = Product(uuid_str=str(uu), atime=when, name='B00 Refl', obs_time=when, obs_duration=timedelta(minutes=5))
f.product.append(p)
p.info['test_key'] = u'test_value'
p.info['turkey'] = u'cobbler'
s.add(f)
s.add(p)
s.commit()
p.info.update({'key': 'value'})
p.info.update({Info.OBS_TIME: datetime.utcnow()})
p.info.update({Info.OBS_TIME: nextwhen, Info.OBS_DURATION: timedelta(seconds=15)})
# p.info.update({'key': 'value', Info.OBS_TIME: nextwhen, Info.OBS_DURATION: nextwhen + timedelta(seconds=15)})
# p.info[Info.OBS_TIME] = nextwhen
# p.info['key'] = 'value'
# p.obs_time = nextwhen
s.commit()
self.assertIs(p.resource[0], f)
def merge_resources(self) -> Iterable[Resource]:
"""
Returns:
sequence of Resources found at the source, typically one resource per file
"""
now = datetime.utcnow()
if self._resource is not None:
res = self._resource
else:
self._resource = res = self._S.query(Resource).filter(Resource.path == self.source_path).first()
if res is None:
LOG.debug('creating new Resource entry for {}'.format(self.source_path))
self._resource = res = Resource(
format=type(self),
path=self.source_path,
mtime=now,
atime=now,
)
self._S.add(res)
return [self._resource]
def _purge_inaccessible_resources(self):
"""
remove Resources that are no longer accessible
"""
LOG.debug("purging any resources that are no longer accessible")
with self._inventory as s:
resall = list(s.query(Resource).all())
n_purged = 0
for r in resall:
if not r.exists():
LOG.info("resource {} no longer exists, purging from database")
# for p in r.product:
# p.resource.remove(r)
n_purged += 1
s.delete(r)
LOG.info("discarded metadata for {} orphaned resources".format(n_purged))
Args:
*products: sequence of products to import
Returns:
generator which yields status tuples as the content is imported
"""
# FUTURE: this should be async def coroutine
return
class aSingleFileWithSingleProductImporter(aImporter):
"""
simplification of importer that handles a single-file with a single product
"""
source_path: str = None
_resource: Resource = None
def __init__(self, source_path, workspace_cwd, database_session, **kwargs):
if isinstance(source_path, list) and len(source_path) == 1:
# backwards compatibility - we now expect a list
source_path = source_path[0]
super(aSingleFileWithSingleProductImporter, self).__init__(workspace_cwd, database_session)
self.source_path = source_path
@property
def num_products(self):
return 1
def merge_resources(self) -> Iterable[Resource]:
"""
Returns:
sequence of Resources found at the source, typically one resource per file
class Product(Base):
"""
Primary entity being tracked in metadatabase
One or more StoredProduct are held in a single File
A StoredProduct has zero or more Content representations, potentially at different projections
A StoredProduct has zero or more ProductKeyValue pairs with additional metadata
A File's format allows data to be imported to the workspace
A StoredProduct's kind determines how its cached data is transformed to different representations for display
additional information is stored in a key-value table addressable as product[key:str]
"""
__tablename__ = 'products_v1'
# identity information
id = Column(Integer, primary_key=True)
resource_id = Column(Integer, ForeignKey(Resource.id))
# relationship: .resource
uuid_str = Column(String, nullable=False,
unique=True) # UUID representing this data in SIFT, or None if not in cache
@property
def uuid(self):
return UUID(self.uuid_str)
@uuid.setter
def uuid(self, uu):
self.uuid_str = str(uu)
# primary handler
# kind = Column(PickleType) # class or callable which can perform transformations on this data in workspace
atime = Column(DateTime, nullable=False) # last time this file was accessed by application
def _clean_cache(self):
"""
find stale content in the cache and get rid of it
this routine should eventually be compatible with backgrounding on a thread
possibly include a workspace setting for max workspace size in bytes?
:return:
"""
# get information on current cache contents
with self._inventory as S:
LOG.info("cleaning cache")
total_size = self._total_workspace_bytes
GB = 1024 ** 3
LOG.info("total cache size is {}GB of max {}GB".format(total_size / GB, self._max_size_gb))
max_size = self._max_size_gb * GB
for res in S.query(Resource).order_by(Resource.atime).all():
if total_size < max_size:
break
total_size -= self._purge_content_for_resource(res, session=S)
# remove all content for lowest atimes until
if len(self._resources) == len(self.filenames):
return self._resources
resources = self._S.query(Resource).filter(
Resource.path.in_(self.filenames)).all()
if len(resources) == len(self.filenames):
self._resources = resources
return self._resources
now = datetime.utcnow()
res_dict = {r.path: r for r in self._resources}
for fn in self.filenames:
if fn in res_dict:
continue
res = Resource(
format=type(self),
path=fn,
mtime=now,
atime=now,
)
self._S.add(res)
res_dict[fn] = res
self._resources = res_dict.values()
return self._resources
def begin_import_products(self, *product_ids) -> Generator[import_progress, None, None]:
import dask.array as da
if product_ids:
products = [self._S.query(Product).filter_by(id=anid).one() for anid in product_ids]
assert products
else:
products = list(self._S.query(Resource, Product).filter(
Resource.path.in_(self.filenames)).filter(
Product.resource_id == Resource.id).all())
assert products
# FIXME: Don't recreate the importer every time we want to load data
dataset_ids = [DatasetID.from_dict(prod.info) for prod in products]
self.scn.load(dataset_ids)
num_stages = len(products)
for idx, (prod, ds_id) in enumerate(zip(products, dataset_ids)):
dataset = self.scn[ds_id]
shape = dataset.shape
num_contents = 1 if prod.info[Info.KIND] == Kind.IMAGE else 2
if prod.content:
LOG.warning('content was already available, skipping import')
continue
def _all_tables_present(self):
from sqlalchemy.engine.reflection import Inspector
inspector = Inspector.from_engine(self.engine)
all_tables = set(inspector.get_table_names())
zult = True
for table_name in (Resource.__tablename__, Product.__tablename__,
ProductKeyValue.__tablename__, SymbolKeyValue.__tablename__,
Content.__tablename__, ContentKeyValue.__tablename__, PRODUCTS_FROM_RESOURCES_TABLE_NAME):
present = table_name in all_tables
LOG.debug("table {} {} present in database".format(table_name, "is" if present else "is not"))
zult = False if not present else zult
return zult