Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
try:
obj_id = obj['id']
except KeyError:
raise ValueError("STIX object has no 'id' property")
except TypeError:
# Assume `obj` is an ID string
obj_id = obj
if relationship_type:
filters.append(Filter('relationship_type', '=', relationship_type))
if source_only and target_only:
raise ValueError("Search either source only or target only, but not both")
if not target_only:
results.extend(self.query(filters + [Filter('source_ref', '=', obj_id)]))
if not source_only:
results.extend(self.query(filters + [Filter('target_ref', '=', obj_id)]))
return results
list: The Relationship objects involving the given STIX object.
"""
results = []
filters = [Filter('type', '=', 'relationship')]
try:
obj_id = obj['id']
except KeyError:
raise ValueError("STIX object has no 'id' property")
except TypeError:
# Assume `obj` is an ID string
obj_id = obj
if relationship_type:
filters.append(Filter('relationship_type', '=', relationship_type))
if source_only and target_only:
raise ValueError("Search either source only or target only, but not both")
if not target_only:
results.extend(self.query(filters + [Filter('source_ref', '=', obj_id)]))
if not source_only:
results.extend(self.query(filters + [Filter('target_ref', '=', obj_id)]))
return results
def __new__(cls, prop, op, value):
# If value is a list, convert it to a tuple so it is hashable.
if isinstance(value, list):
value = tuple(value)
_check_filter_components(prop, op, value)
self = super(Filter, cls).__new__(cls, prop, op, value)
return self
Args:
obj (STIX object OR dict OR str): The STIX object (or its ID) whose
relationships will be looked up.
relationship_type (str): Only retrieve Relationships of this type.
If None, all relationships will be returned, regardless of type.
source_only (bool): Only retrieve Relationships for which this
object is the source_ref. Default: False.
target_only (bool): Only retrieve Relationships for which this
object is the target_ref. Default: False.
Returns:
list: The Relationship objects involving the given STIX object.
"""
results = []
filters = [Filter('type', '=', 'relationship')]
try:
obj_id = obj['id']
except KeyError:
raise ValueError("STIX object has no 'id' property")
except TypeError:
# Assume `obj` is an ID string
obj_id = obj
if relationship_type:
filters.append(Filter('relationship_type', '=', relationship_type))
if source_only and target_only:
raise ValueError("Search either source only or target only, but not both")
if not target_only:
Args:
stix_id (str): The STIX ID of the STIX objects to be retrieved.
_composite_filters (FilterSet): collection of filters passed from
the parent CompositeDataSource, not user supplied
version (str): If present, it forces the parser to use the version
provided. Otherwise, the library will make the best effort based
on checking the "spec_version" property.
Returns:
(list): of STIX objects that has the supplied STIX ID.
The STIX objects are loaded from their json files, parsed into
a python STIX objects and then returned
"""
query = [Filter("id", "=", stix_id)]
return self.query(query, version=version, _composite_filters=_composite_filters)
def all_versions(self, stix_id, _composite_filters=None):
"""Retrieve STIX object from local/remote TAXII Collection
endpoint, all versions of it
Args:
stix_id (str): The STIX ID of the STIX objects to be retrieved.
_composite_filters (FilterSet): collection of filters passed from the parent
CompositeDataSource, not user supplied
Returns:
(see query() as all_versions() is just a wrapper)
"""
# make query in TAXII query format since 'id' is TAXII field
query = [
Filter('id', '=', stix_id),
Filter('version', '=', 'all'),
]
all_data = self.query(query=query, _composite_filters=_composite_filters)
# parse STIX objects from TAXII returned json
all_data = [parse(stix_obj, allow_custom=self.allow_custom) for stix_obj in all_data]
# check - was added to handle erroneous TAXII servers
all_data_clean = [stix_obj for stix_obj in all_data if stix_obj.id == stix_id]
return all_data_clean
def __new__(cls, prop, op, value):
# If value is a list, convert it to a tuple so it is hashable.
if isinstance(value, list):
value = tuple(value)
_check_filter_components(prop, op, value)
self = super(Filter, cls).__new__(cls, prop, op, value)
return self