Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
return
from uuid import uuid1
scn = self.load_all_datasets()
for ds_id, ds in scn.datasets.items():
# don't recreate a Product for one we already have
if ds_id in existing_ids:
yield existing_ids[ds_id]
continue
existing_ids.get(ds_id, None)
meta = ds.attrs
uuid = uuid1()
meta[Info.UUID] = uuid
now = datetime.utcnow()
prod = Product(
uuid_str=str(uuid),
atime=now,
)
prod.resource.extend(resources)
assert (Info.OBS_TIME in meta)
assert (Info.OBS_DURATION in meta)
prod.update(meta) # sets fields like obs_duration and obs_time transparently
assert (prod.info[Info.OBS_TIME] is not None and prod.obs_time is not None)
assert (prod.info[Info.VALID_RANGE] is not None)
LOG.debug('new product: {}'.format(repr(prod)))
self._S.add(prod)
self._S.commit()
yield prod
if res is None:
LOG.debug('no resources for {}'.format(self.source_path))
return []
if len(res.product):
zult = list(res.product)
# LOG.debug('pre-existing products {}'.format(repr(zult)))
return zult
# else probe the file and add product metadata, without importing content
from uuid import uuid1
uuid = uuid1()
meta = self.product_metadata()
meta[Info.UUID] = uuid
prod = Product(
uuid_str=str(uuid),
atime=now,
)
prod.resource.append(res)
assert (Info.OBS_TIME in meta)
assert (Info.OBS_DURATION in meta)
prod.update(meta) # sets fields like obs_duration and obs_time transparently
assert (prod.info[Info.OBS_TIME] is not None and prod.obs_time is not None)
assert (prod.info[Info.VALID_RANGE] is not None)
LOG.debug('new product: {}'.format(repr(prod)))
self._S.add(prod)
self._S.commit()
return [prod]
def begin_import_products(self, *product_ids):
source_path = self.source_path
if product_ids:
products = [self._S.query(Product).filter_by(id=anid).one() for anid in product_ids]
assert (products)
else:
products = list(self._S.query(Resource, Product).filter(
Resource.path == source_path).filter(
Product.resource_id == Resource.id).all())
assert (products)
if len(products) > 1:
LOG.warning('only first product currently handled in pug loader')
prod = products[0]
if prod.content:
LOG.warning('content was already available, skipping import')
return
pug = GoesRPUGImporter.pug_factory(source_path)
rows, cols = shape = pug.shape
cell_height, cell_width = pug.cell_size
origin_y, origin_x = pug.origin
proj4 = pug.proj4_string
now = datetime.utcnow()
def __getitem__(self, uuid: UUID):
if not isinstance(uuid, UUID):
raise ValueError("need a UUID here")
with self.mdb as S:
prod = S.query(Product).filter_by(uuid_str=str(uuid)).first()
return prod.info
def __init__(self, *args, **kwargs):
super(Product, self).__init__(*args, **kwargs)
def family_for_product_or_layer(self, uuid_or_layer):
if isinstance(uuid_or_layer, UUID):
with self._workspace.metadatabase as s:
fam = s.query(Product.family).filter_by(uuid_str=str(uuid_or_layer)).first()
if fam:
return fam[0]
uuid_or_layer = self[uuid_or_layer]
if Info.FAMILY in uuid_or_layer:
LOG.debug('using pre-existing family {}'.format(uuid_or_layer[Info.FAMILY]))
return uuid_or_layer[Info.FAMILY]
# kind:pointofreference:measurement:wavelength
kind = uuid_or_layer[Info.KIND]
refpoint = 'unknown' # FUTURE: geo/leo
measurement = uuid_or_layer.get(Info.STANDARD_NAME)
if uuid_or_layer.get('recipe'):
# RGB
subcat = uuid_or_layer['recipe'].name
elif uuid_or_layer.get(Info.CENTRAL_WAVELENGTH):
# basic band
subcat = uuid_or_layer[Info.CENTRAL_WAVELENGTH]
def sort_product_uuids(self, uuids: typ.Iterable[UUID]) -> typ.List[UUID]:
uuidset = set(str(x) for x in uuids)
if not uuidset:
return []
with self._workspace.metadatabase as S:
zult = [(x.uuid, x.ident) for x in S.query(Product)
.filter(Product.uuid_str.in_(uuidset))
.order_by(Product.family, Product.category, Product.serial)
.all()]
LOG.debug("sorted products: {}".format(repr(zult)))
return [u for u, _ in zult]
data: ndarray with content to store, typically 2D float32
namespace: {variable: uuid, } for calculation of this data
codeblock: text, code to run to recalculate this data within namespace
Returns:
uuid, info, data: uuid of the new product, its official read-only metadata, and cached content ndarray
"""
if Info.UUID not in info:
raise ValueError('currently require an Info.UUID be included in product')
parms = dict(info)
now = datetime.utcnow()
parms.update(dict(
atime=now,
mtime=now,
))
P = Product.from_info(parms, symbols=namespace, codeblock=codeblock)
uuid = P.uuid
# FUTURE: add expression and namespace information, which would require additional parameters
ws_filename = '{}.image'.format(str(uuid))
ws_path = os.path.join(self.cache_dir, ws_filename)
with open(ws_path, 'wb+') as fp:
mm = np.memmap(fp, dtype=data.dtype, shape=data.shape, mode='w+')
mm[:] = data[:]
parms.update(dict(
lod=Content.LOD_OVERVIEW,
path=ws_filename,
dtype=str(data.dtype),
proj4=info[Info.PROJ],
resolution=min(info[Info.CELL_WIDTH], info[Info.CELL_HEIGHT])
))
rcls = dict(zip(('rows', 'cols', 'levels'), data.shape))
class Content(Base):
"""
represent flattened product data files in cache (i.e. cache content)
typically memory-map ready data (np.memmap)
basic correspondence to projection/geolocation information may accompany
images will typically have rows>0 cols>0 levels=None (implied levels=1)
profiles may have rows>0 cols=None (implied cols=1) levels>0
a given product may have several Content for different projections
additional information is stored in a key-value table addressable as content[key:str]
"""
# _array = None # when attached, this is a np.memmap
__tablename__ = 'contents_v1'
id = Column(Integer, primary_key=True)
product_id = Column(Integer, ForeignKey(Product.id))
# handle overview versus detailed data
lod = Column(Integer) # power of 2 level of detail; 0 for coarse-resolution overview
LOD_OVERVIEW = 0
resolution = Column(Integer) # maximum resolution in meters for this representation of the dataset
# time accounting, used to check if data needs to be re-imported to workspace,
# or whether data is LRU and can be removed from a crowded workspace
mtime = Column(DateTime) # last observed mtime of the original source of this data, for change checking
atime = Column(DateTime) # last time this product was accessed by application
# actual data content
# NaNs are used to signify missing data; NaNs can include integer category fields in significand;
# please ref IEEE 754
path = Column(String, unique=True) # relative to workspace, binary array of data
def begin_import_products(self, *product_ids): # FUTURE: allow product_ids to be uuids
import gdal
source_path = self.source_path
if product_ids:
products = [self._S.query(Product).filter_by(id=anid).one() for anid in product_ids]
else:
products = list(self._S.query(Resource, Product).filter(
Resource.path == source_path).filter(
Product.resource_id == Resource.id).all())
assert (products)
if len(products) > 1:
LOG.warning('only first product currently handled in geotiff loader')
prod = products[0]
if prod.content:
LOG.info('content is already available, skipping import')
return
now = datetime.utcnow()
# re-collect the metadata, which should be separated between Product vs Content metadata in the FUTURE
# principally we're not allowed to store ORIGIN_ or CELL_ metadata in the Product
info = GeoTiffImporter.get_metadata(source_path)