Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
Args:
stix_id (str): id of the STIX objects to retrieve.
_composite_filters (FilterSet): a collection of filters passed from a
CompositeDataSource (i.e. if this CompositeDataSource is
attached to a parent CompositeDataSource), not user supplied.
Returns:
list: The STIX objects that have the specified id.
"""
if not self.has_data_sources():
raise AttributeError("CompositeDataSource has no data sources")
all_data = []
all_filters = FilterSet()
all_filters.add(self.filters)
if _composite_filters:
all_filters.add(_composite_filters)
# retrieve STIX objects from all configured data sources
for ds in self.data_sources:
data = ds.all_versions(stix_id=stix_id, _composite_filters=all_filters)
all_data.extend(data)
# remove exact duplicates (where duplicates are STIX 2.0 objects
# with the same 'id' and 'modified' values)
if len(all_data) > 0:
all_data = deduplicate(all_data)
A "complete query" includes the filters from the query, the filters
attached to MemorySource, and any filters passed from a
CompositeDataSource (i.e. _composite_filters)
Args:
query (list): list of filters to search on
_composite_filters (FilterSet): collection of filters passed from
the CompositeDataSource, not user supplied
Returns:
(list): list of STIX objects that matches the supplied
query. The STIX objects are received from TAXII as dicts,
parsed into python STIX objects and then returned.
"""
query = FilterSet(query)
# combine all query filters
if self.filters:
query.add(self.filters)
if _composite_filters:
query.add(_composite_filters)
# parse taxii query params (that can be applied remotely)
taxii_filters = self._parse_taxii_filters(query)
# taxii2client requires query params as keywords
taxii_filters_dict = dict((f.property, f.value) for f in taxii_filters)
# query TAXII collection
try:
all_data = self.collection.get_objects(**taxii_filters_dict)['objects']
def vulnerabilities(filters=None):
"""Retrieve all Vulnerability objects.
Args:
filters (list, optional): A list of additional filters to apply to
the query.
"""
filter_list = FilterSet(filters)
filter_list.add(Filter('type', '=', 'vulnerability'))
return query(filter_list)
Args:
query (list): list of filters to search on
_composite_filters (FilterSet): collection of filters passed from
the CompositeDataSource, not user supplied
version (str): If present, it forces the parser to use the version
provided. Otherwise, the library will make the best effort based
on checking the "spec_version" property.
Returns:
(list): list of STIX objects that matches the supplied
query. The STIX objects are loaded from their json files,
parsed into a python STIX objects and then returned.
"""
all_data = []
query = FilterSet(query)
# combine all query filters
if self.filters:
query.add(self.filters)
if _composite_filters:
query.add(_composite_filters)
auth_types, auth_ids = _find_search_optimizations(query)
type_dirs = _get_matching_dir_entries(
self._stix_dir, auth_types,
stat.S_ISDIR,
)
for type_dir in type_dirs:
type_path = os.path.join(self._stix_dir, type_dir)
if type_dir == "marking-definition":
type_results = _search_markings(
def __init__(self):
super(DataSource, self).__init__()
self.id = make_id()
self.filters = FilterSet()
CompositeDataSource (i.e. _composite_filters).
Args:
query (list): list of filters to search on
_composite_filters (FilterSet): collection of filters passed from
the CompositeDataSource, not user supplied
Returns:
stix_objs (list): list of STIX objects that matches the supplied
query. The STIX objects are loaded from their json files,
parsed into a python STIX objects and then returned.
"""
all_data = []
query = FilterSet(query)
# combine all query filters
if self.filters:
query.add(self.filters)
if _composite_filters:
query.add(_composite_filters)
# extract any filters that are for "type" or "id" , as we can then do
# filtering before reading in the STIX objects. A STIX 'type' filter
# can reduce the query to a single sub-directory. A STIX 'id' filter
# allows for the fast checking of the file names versus loading it.
file_filters = self._parse_file_filters(query)
# establish which subdirectories can be avoided in query
# by decluding as many as possible. A filter with "type" as the property
# means that certain STIX object types can be ruled out, and thus
def threat_actors(filters=None):
"""Retrieve all Threat Actor objects.
Args:
filters (list, optional): A list of additional filters to apply to
the query.
"""
filter_list = FilterSet(filters)
filter_list.add(Filter('type', '=', 'threat-actor'))
return query(filter_list)
"""Search and retrieve STIX objects based on the complete query.
A "complete query" includes the filters from the query, the filters
attached to this MemorySource, and any filters passed from a
CompositeDataSource (i.e. _composite_filters).
Args:
query (list): list of filters to search on
_composite_filters (FilterSet): collection of filters passed from the
CompositeDataSource, not user supplied
Returns:
(list): list of STIX objects that match the supplied query.
"""
query = FilterSet(query)
# combine all query filters
if self.filters:
query.add(self.filters)
if _composite_filters:
query.add(_composite_filters)
all_objs = itertools.chain.from_iterable(
value.all_versions.values() if isinstance(value, _ObjectFamily)
else [value]
for value in self._data.values()
)
# Apply STIX common property filters.
all_data = list(apply_common_filters(all_objs, query))
"""Search and retrieve STIX objects based on the complete query.
A "complete query" includes the filters from the query, the filters
attached to this MemorySource, and any filters passed from a
CompositeDataSource (i.e. _composite_filters).
Args:
query (list): list of filters to search on
_composite_filters (FilterSet): collection of filters passed from
the CompositeDataSource, not user supplied
Returns:
(list): list of STIX objects that match the supplied query.
"""
query = FilterSet(query)
# combine all query filters
if self.filters:
query.add(self.filters)
if _composite_filters:
query.add(_composite_filters)
all_objs = itertools.chain.from_iterable(
value.all_versions.values() if isinstance(value, _ObjectFamily)
else [value]
for value in self._data.values()
)
# Apply STIX common property filters.
all_data = list(apply_common_filters(all_objs, query))