Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if root is not None:
collection_cache = root._resolved_objects.as_collection_cache()
merge_common_properties(d, json_href=href, collection_cache=collection_cache)
info = identify_stac_object(d)
d = migrate_to_latest(d, info)
if info.object_type == STACObjectType.CATALOG:
return Catalog.from_dict(d, href=href, root=root)
if info.object_type == STACObjectType.COLLECTION:
return Collection.from_dict(d, href=href, root=root)
if info.object_type == STACObjectType.ITEMCOLLECTION:
if Extension.SINGLE_FILE_STAC in info.common_extensions:
return SingleFileSTAC.from_dict(d)
return ItemCollection.from_dict(d)
if info.object_type == STACObjectType.ITEM:
if Extension.EO in info.common_extensions:
return EOItem.from_dict(d, href=href, root=root)
if Extension.LABEL in info.common_extensions:
return LabelItem.from_dict(d, href=href, root=root)
return Item.from_dict(d, href=href, root=root)
def stac_object_from_dict(d, href=None, root=None):
"""Determines how to deserialize a dictionary into a STAC object.
Args:
d (dict): The dict to parse.
href (str): Optional href that is the file location of the object being
parsed.
root (Catalog or Collection): Optional root of the catalog for this object.
If provided, the root's resolved object cache can be used to search for
previously resolved instances of the STAC object.
Note: This is used internally in STAC_IO to deserialize STAC Objects.
It is in the top level __init__ in order to avoid circular dependencies.
"""
if identify_stac_object_type(d) == STACObjectType.ITEM:
collection_cache = None
if root is not None:
collection_cache = root._resolved_objects.as_collection_cache()
merge_common_properties(d, json_href=href, collection_cache=collection_cache)
info = identify_stac_object(d)
d = migrate_to_latest(d, info)
if info.object_type == STACObjectType.CATALOG:
return Catalog.from_dict(d, href=href, root=root)
if info.object_type == STACObjectType.COLLECTION:
return Collection.from_dict(d, href=href, root=root)
json_dict (dict): The dict of STAC JSON to identify.
Returns:
STACObjectType: The object type represented by the JSON.
"""
object_type = None
if 'type' in json_dict:
if json_dict['type'] == 'FeatureCollection':
object_type = STACObjectType.ITEMCOLLECTION
else:
object_type = STACObjectType.ITEM
elif 'extent' in json_dict:
object_type = STACObjectType.COLLECTION
else:
object_type = STACObjectType.CATALOG
return object_type
def _migrate_label(d, version, info):
if info.object_type == STACObjectType.ITEM and version < '1.0.0':
props = d['properties']
# Migrate 0.8.0-rc1 non-pluralized forms
# As it's a common mistake, convert for any pre-1.0.0 version.
if 'label:property' in props and 'label:properties' not in props:
props['label:properties'] = props['label:property']
del props['label:property']
if 'label:task' in props and 'label:tasks' not in props:
props['label:tasks'] = props['label:task']
del props['label:task']
if 'label:overview' in props and 'label:overviews' not in props:
props['label:overviews'] = props['label:overview']
del props['label:overview']
if 'label:method' in props and 'label:methods' not in props:
if object_type == STACObjectType.ITEM:
if any(filter(lambda k: k.startswith('eo:'), d['properties'])):
stac_extensions.add(Extension.EO)
if 'eo:epsg' in d['properties']:
if d['properties']['eo:epsg'] is None:
version_range.set_min('0.6.1')
if 'eo:crs' in d['properties']:
version_range.set_max('0.4.1')
if 'eo:constellation' in d['properties']:
version_range.set_min('0.6.0')
if 'eo:bands' in d:
stac_extensions.add(Extension.EO)
version_range.set_max('0.5.2')
# pointcloud
if object_type == STACObjectType.ITEM:
if any(filter(lambda k: k.startswith('pc:'), d['properties'])):
stac_extensions.add(Extension.POINTCLOUD)
version_range.set_min('0.6.2')
# sar
if object_type == STACObjectType.ITEM:
if any(filter(lambda k: k.startswith('sar:'), d['properties'])):
stac_extensions.add(Extension.SAR)
version_range.set_min('0.6.2')
if version_range.contains('0.6.2'):
for prop in [
'sar:absolute_orbit', 'sar:resolution', 'sar:pixel_spacing', 'sar:looks'
]:
if prop in d['properties']:
if isinstance(d['properties'][prop], list):
version_range.set_max('0.6.2')
if 'sar:constellation' in d['properties'] and 'constellation' not in d['properties']:
d['properties']['constellation'] = d['properties']['sar:constellation']
del d['properties']['sar:constellation']
def _migrate_scientific(d, version, info):
pass
def _migrate_single_file_stac(d, version, info):
pass
_object_migrations = {
STACObjectType.CATALOG: _migrate_catalog,
STACObjectType.COLLECTION: _migrate_collection,
STACObjectType.ITEM: _migrate_item,
STACObjectType.ITEMCOLLECTION: _migrate_itemcollection
}
_extension_migrations = {
Extensions.ASSETS: _migrate_assets,
Extensions.CHECKSUM: _migrate_checksum,
Extensions.DATACUBE: _migrate_datacube,
Extensions.EO: _migrate_eo,
Extensions.LABEL: _migrate_label,
Extensions.POINTCLOUD: _migrate_pointcloud,
Extensions.SAR: _migrate_sar,
Extensions.SCIENTIFIC: _migrate_scientific,
Extensions.SINGLE_FILE_STAC: _migrate_single_file_stac
}
del d['properties']['sar:constellation']
def _migrate_scientific(d, version, info):
pass
def _migrate_single_file_stac(d, version, info):
pass
_object_migrations = {
STACObjectType.CATALOG: _migrate_catalog,
STACObjectType.COLLECTION: _migrate_collection,
STACObjectType.ITEM: _migrate_item,
STACObjectType.ITEMCOLLECTION: _migrate_itemcollection
}
_extension_migrations = {
Extensions.ASSETS: _migrate_assets,
Extensions.CHECKSUM: _migrate_checksum,
Extensions.DATACUBE: _migrate_datacube,
Extensions.EO: _migrate_eo,
Extensions.LABEL: _migrate_label,
Extensions.POINTCLOUD: _migrate_pointcloud,
Extensions.SAR: _migrate_sar,
Extensions.SCIENTIFIC: _migrate_scientific,
Extensions.SINGLE_FILE_STAC: _migrate_single_file_stac
}
_removed_extension_migrations = {
# Removed in 0.9.0
d['properties']['constellation'] = d['properties']['sar:constellation']
del d['properties']['sar:constellation']
def _migrate_scientific(d, version, info):
pass
def _migrate_single_file_stac(d, version, info):
pass
_object_migrations = {
STACObjectType.CATALOG: _migrate_catalog,
STACObjectType.COLLECTION: _migrate_collection,
STACObjectType.ITEM: _migrate_item,
STACObjectType.ITEMCOLLECTION: _migrate_itemcollection
}
_extension_migrations = {
Extensions.ASSETS: _migrate_assets,
Extensions.CHECKSUM: _migrate_checksum,
Extensions.DATACUBE: _migrate_datacube,
Extensions.EO: _migrate_eo,
Extensions.LABEL: _migrate_label,
Extensions.POINTCLOUD: _migrate_pointcloud,
Extensions.SAR: _migrate_sar,
Extensions.SCIENTIFIC: _migrate_scientific,
Extensions.SINGLE_FILE_STAC: _migrate_single_file_stac
}
_removed_extension_migrations = {
if 'sar:constellation' in d['properties'] and 'constellation' not in d['properties']:
d['properties']['constellation'] = d['properties']['sar:constellation']
del d['properties']['sar:constellation']
def _migrate_scientific(d, version, info):
pass
def _migrate_single_file_stac(d, version, info):
pass
_object_migrations = {
STACObjectType.CATALOG: _migrate_catalog,
STACObjectType.COLLECTION: _migrate_collection,
STACObjectType.ITEM: _migrate_item,
STACObjectType.ITEMCOLLECTION: _migrate_itemcollection
}
_extension_migrations = {
Extensions.ASSETS: _migrate_assets,
Extensions.CHECKSUM: _migrate_checksum,
Extensions.DATACUBE: _migrate_datacube,
Extensions.EO: _migrate_eo,
Extensions.LABEL: _migrate_label,
Extensions.POINTCLOUD: _migrate_pointcloud,
Extensions.SAR: _migrate_sar,
Extensions.SCIENTIFIC: _migrate_scientific,
Extensions.SINGLE_FILE_STAC: _migrate_single_file_stac
}
Args:
json_dict (dict): The dict of STAC JSON to identify.
Returns:
STACObjectType: The object type represented by the JSON.
"""
object_type = None
if 'type' in json_dict:
if json_dict['type'] == 'FeatureCollection':
object_type = STACObjectType.ITEMCOLLECTION
else:
object_type = STACObjectType.ITEM
elif 'extent' in json_dict:
object_type = STACObjectType.COLLECTION
else:
object_type = STACObjectType.CATALOG
return object_type