Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_full_copy_1(self):
with TemporaryDirectory() as tmp_dir:
cat = Catalog(id='test', description='test catalog')
item = Item(id='test_item',
geometry=RANDOM_GEOM,
bbox=RANDOM_BBOX,
datetime=datetime.utcnow(),
properties={})
cat.add_item(item)
cat.normalize_hrefs(os.path.join(tmp_dir, 'catalog-full-copy-1-source'))
cat2 = cat.full_copy()
cat2.normalize_hrefs(os.path.join(tmp_dir, 'catalog-full-copy-1-dest'))
self.check_catalog(cat, 'source')
self.check_catalog(cat2, 'dest')
def test_map_items_multiple_2(self):
catalog = Catalog(id='test-1', description='Test1')
item1 = Item(id='item1',
geometry=RANDOM_GEOM,
bbox=RANDOM_BBOX,
datetime=datetime.utcnow(),
properties={})
item1.add_asset('ortho', Asset(href='/some/ortho.tif'))
catalog.add_item(item1)
kitten = Catalog(id='test-kitten', description='A cuter version of catalog')
catalog.add_child(kitten)
item2 = Item(id='item2',
geometry=RANDOM_GEOM,
bbox=RANDOM_BBOX,
datetime=datetime.utcnow(),
properties={})
item2.add_asset('ortho', Asset(href='/some/other/ortho.tif'))
kitten.add_item(item2)
def modify_item_title(item):
item.title = 'Some new title'
return item
def create_label_item(item):
# Assumes the GEOJSON labels are in the
# same location as the image
def test_clear_items_removes_from_cache(self):
catalog = Catalog(id='test', description='test')
subcat = Catalog(id='subcat', description='test')
catalog.add_child(subcat)
item = Item(id='test-item',
geometry=RANDOM_GEOM,
bbox=RANDOM_BBOX,
datetime=datetime.utcnow(),
properties={'key': 'one'})
subcat.add_item(item)
items = list(catalog.get_all_items())
self.assertEqual(len(items), 1)
self.assertEqual(items[0].properties['key'], 'one')
subcat.clear_items()
item = Item(id='test-item',
geometry=RANDOM_GEOM,
bbox=RANDOM_BBOX,
def test_can_add_custom_extension(self):
prev_extensions = pystac.STAC_EXTENSIONS.get_registered_extensions()
pystac.STAC_EXTENSIONS.add_extension(
ExtensionDefinition("test", [
ExtendedObject(Catalog, TestCatalogExt),
ExtendedObject(Collection, TestCollectionExt),
ExtendedObject(Item, TestItemExt)
]))
try:
cat = TestCases.test_case_2()
col = cat.get_child('1a8c1632-fa91-4a62-b33e-3a87c2ebdf16')
item = next(cat.get_all_items())
cat.ext.enable("test")
col.ext.enable("test")
item.ext.enable("test")
self.assertEqual(cat.ext.test.test_id, cat.id)
self.assertEqual(col.ext.test.xmin, col.extent.spatial.bboxes[0][0])
self.assertEqual(item.ext.test.asset_keys, set(item.assets))
def test_case_3():
root_cat = Catalog(id='test3',
description='test case 3 catalog',
title='test case 3 title')
image_item = Item(id='imagery-item',
geometry=RANDOM_GEOM,
bbox=RANDOM_BBOX,
datetime=datetime.utcnow(),
properties={})
image_item.add_asset(
'ortho',
Asset(href='some/geotiff.tiff', media_type=MediaType.GEOTIFF))
overviews = [
LabelOverview('label',
counts=[LabelCount('one', 1),
catalog.add_child(subcat)
children = list(catalog.get_children())
self.assertEqual(len(children), 1)
self.assertEqual(children[0].description, 'test')
catalog.clear_children()
subcat = Catalog(id='subcat', description='test2')
catalog.add_child(subcat)
children = list(catalog.get_children())
self.assertEqual(len(children), 1)
self.assertEqual(children[0].description, 'test2')
catalog.remove_child('subcat')
subcat = Catalog(id='subcat', description='test3')
catalog.add_child(subcat)
children = list(catalog.get_children())
self.assertEqual(len(children), 1)
self.assertEqual(children[0].description, 'test3')
def test_clear_items_removes_from_cache(self):
catalog = Catalog(id='test', description='test')
subcat = Catalog(id='subcat', description='test')
catalog.add_child(subcat)
item = Item(id='test-item',
geometry=RANDOM_GEOM,
bbox=RANDOM_BBOX,
datetime=datetime.utcnow(),
properties={'key': 'one'})
subcat.add_item(item)
items = list(catalog.get_all_items())
self.assertEqual(len(items), 1)
self.assertEqual(items[0].properties['key'], 'one')
subcat.clear_items()
item = Item(id='test-item',
geometry=RANDOM_GEOM,
Note: This is used internally in STAC_IO to deserialize STAC Objects.
It is in the top level __init__ in order to avoid circular dependencies.
"""
if identify_stac_object_type(d) == STACObjectType.ITEM:
collection_cache = None
if root is not None:
collection_cache = root._resolved_objects.as_collection_cache()
merge_common_properties(d, json_href=href, collection_cache=collection_cache)
info = identify_stac_object(d)
d = migrate_to_latest(d, info)
if info.object_type == STACObjectType.CATALOG:
return Catalog.from_dict(d, href=href, root=root)
if info.object_type == STACObjectType.COLLECTION:
return Collection.from_dict(d, href=href, root=root)
if info.object_type == STACObjectType.ITEMCOLLECTION:
if Extension.SINGLE_FILE_STAC in info.common_extensions:
return SingleFileSTAC.from_dict(d)
return ItemCollection.from_dict(d)
if info.object_type == STACObjectType.ITEM:
if Extension.EO in info.common_extensions:
return EOItem.from_dict(d, href=href, root=root)
if Extension.LABEL in info.common_extensions:
return LabelItem.from_dict(d, href=href, root=root)