Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
cat_dict = STAC_IO.read_json(href)
cat = STACObject.from_file(href)
for link in cat.get_links():
if not link.rel == 'self':
self.assertEqual(link.link_type, link_type)
rels = set([l['rel'] for l in cat_dict['links']])
self.assertEqual('self' in rels, should_include_self)
for child_link in cat.get_child_links():
child_href = make_absolute_href(child_link.target, href)
validate_catalog_link_type(child_href, link_type,
catalog_type == CatalogType.ABSOLUTE_PUBLISHED)
for item_link in cat.get_item_links():
item_href = make_absolute_href(item_link.target, href)
validate_item_link_type(item_href, link_type,
catalog_type == CatalogType.ABSOLUTE_PUBLISHED)
def validate_catalog_link_type(href, link_type, should_include_self):
cat_dict = STAC_IO.read_json(href)
cat = STACObject.from_file(href)
for link in cat.get_links():
if not link.rel == 'self':
self.assertEqual(link.link_type, link_type)
rels = set([l['rel'] for l in cat_dict['links']])
self.assertEqual('self' in rels, should_include_self)
for child_link in cat.get_child_links():
child_href = make_absolute_href(child_link.target, href)
validate_catalog_link_type(child_href, link_type,
catalog_type == CatalogType.ABSOLUTE_PUBLISHED)
for item_link in cat.get_item_links():
item_href = make_absolute_href(item_link.target, href)
validate_item_link_type(item_href, link_type,
catalog_type == CatalogType.ABSOLUTE_PUBLISHED)
def get_absolute_href(self):
"""Gets the aboslute href for this link, if possible.
Returns:
str: Returns this link's HREF. It attempts to derive an absolute HREF
from this link; however, if the link is relative, has no owner,
and has an unresolved target, this will return a relative HREF.
"""
if self.is_resolved():
href = self.target.get_self_href()
else:
href = self.target
if self.owner is not None:
href = make_absolute_href(href, self.owner.get_self_href())
return href
def normalize_hrefs(self, root_href):
if not is_absolute_href(root_href):
root_href = make_absolute_href(root_href, os.getcwd(), start_is_dir=True)
old_self_href = self.get_self_href()
new_self_href = os.path.join(root_href, '{}.json'.format(self.id))
self.set_self_href(new_self_href)
# Make sure relative asset links remain valid.
# This will only work if there is a self href set.
for asset in self.assets.values():
asset_href = asset.href
if not is_absolute_href(asset_href):
if old_self_href is not None:
abs_href = make_absolute_href(asset_href, old_self_href)
new_relative_href = make_relative_href(abs_href, new_self_href)
asset.href = new_relative_href
def normalize_hrefs(self, root_href):
if not is_absolute_href(root_href):
root_href = make_absolute_href(root_href, os.getcwd(), start_is_dir=True)
old_self_href = self.get_self_href()
new_self_href = os.path.join(root_href, '{}.json'.format(self.id))
self.set_self_href(new_self_href)
# Make sure relative asset links remain valid.
# This will only work if there is a self href set.
for asset in self.assets.values():
asset_href = asset.href
if not is_absolute_href(asset_href):
if old_self_href is not None:
abs_href = make_absolute_href(asset_href, old_self_href)
new_relative_href = make_relative_href(abs_href, new_self_href)
asset.href = new_relative_href
def from_file(cls, href):
"""Reads a STACObject implementation from a file.
Args:
href (str): The HREF to read the object from.
Returns:
The specific STACObject implementation class that is represented
by the JSON read from the file located at HREF.
"""
if not is_absolute_href(href):
href = make_absolute_href(href)
d = STAC_IO.read_json(href)
if cls == STACObject:
o = STAC_IO.stac_object_from_dict(d, href=href)
else:
o = cls.from_dict(d, href=href)
# Set the self HREF, if it's not already set to something else.
if o.get_self_href() is None:
o.set_self_href(href)
# If this is a root catalog, set the root to the catalog instance.
root_link = o.get_root_link()
if root_link is not None:
if not root_link.is_resolved():
if root_link.get_absolute_href() == href:
def normalize_hrefs(self, root_href):
# Normalizing requires an absolute path
if not is_absolute_href(root_href):
root_href = make_absolute_href(root_href, os.getcwd(), start_is_dir=True)
# Fully resolve the STAC to avoid linking issues.
# This particularly can happen with unresolved links that have
# relative paths.
self.fully_resolve()
for child in self.get_children():
child_root = os.path.join(root_href, '{}/'.format(child.id))
child.normalize_hrefs(child_root)
for item in self.get_items():
item_root = os.path.join(root_href, '{}'.format(item.id))
item.normalize_hrefs(item_root)
self.set_self_href(os.path.join(root_href, self.DEFAULT_FILE_NAME))
if collection_cache is not None:
collection = collection_cache.get_by_id(collection_id)
# Next, try the collection link.
if collection is None:
links = item_dict['links']
# Account for 0.5 links, which were dicts
if isinstance(links, dict):
links = list(links.values())
collection_link = next((link for link in links if link['rel'] == 'collection'), None)
if collection_link is not None:
collection_href = collection_link['href']
if json_href is not None:
collection_href = make_absolute_href(collection_href, json_href)
if collection_cache is not None:
collection = collection_cache.get_by_href(collection_href)
if collection is None:
collection = STAC_IO.read_json(collection_href)
if collection is not None:
collection_id = None
collection_props = None
if isinstance(collection, Collection):
collection_id = collection.id
collection_props = collection.properties
else:
collection_id = collection['id']
if 'properties' in collection:
collection_props = collection['properties']