Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
:param server: Source to utilize (taxii or local)
:param local: string path to local cache of stix data
"""
self.convert_data = {}
if source.lower() not in ['taxii', 'local']:
print('[MatrixGen] - Unable to generate matrix, source {} is not one of "taxii" or "local"'.format(source))
raise BadSource
if source.lower() == 'taxii':
self.server = Server('https://cti-taxii.mitre.org/taxii')
self.api_root = self.server.api_roots[0]
self.collections = dict()
for collection in self.api_root.collections:
if collection.title != "PRE-ATT&CK":
tc = Collection('https://cti-taxii.mitre.org/stix/collections/' + collection.id)
self.collections[collection.title.split(' ')[0].lower()] = TAXIICollectionSource(tc)
elif source.lower() == 'local':
if local is not None:
try:
self.collections['enterprise'] = FileSystemSource(local)
self.collections['mobile'] = FileSystemSource(local)
except:
raise BadLocation
else:
print('[MatrixGen] - "local" source specified, but path to local source not provided')
raise BadSource
self.matrix = {}
self._build_matrix()
def main():
collection = Collection(
"http://127.0.0.1:5000/trustgroup1/collections/52892447-4d7e-4f70-b94d-d7f22742ff63/",
user="admin", password="Password0",
)
# instantiate TAXII data source
taxii = stix2.TAXIICollectionSource(collection)
# get (url watch indicator)
indicator_fw = taxii.get("indicator--00000000-0000-4000-8000-000000000001")
print("\n\n-------Queried for Indicator - got:")
print(indicator_fw.serialize(indent=4))
# all versions (url watch indicator - currently two)
indicator_fw_versions = taxii.all_versions("indicator--00000000-0000-4000-8000-000000000001")
print("\n\n------Queried for indicator (all_versions()) - got:")
for indicator in indicator_fw_versions:
print(indicator.serialize(indent=4))
# add TAXII filter (ie filter should be passed to TAXII)
query_filter = stix2.Filter("type", "in", "malware")
# query() - but with filter attached. There are no malware objects in this collection
def build_taxii_source(collection_name):
"""Downloads latest Enterprise or Mobile ATT&CK content from MITRE TAXII Server."""
# Establish TAXII2 Collection instance for Enterprise ATT&CK collection
collection_map = {
"enterprise_attack": "95ecc380-afe9-11e4-9b6c-751b66dd541e",
"mobile_attack": "2f669986-b40b-4423-b720-4396ca6a462b"
}
collection_url = "https://cti-taxii.mitre.org/stix/collections/" + collection_map[collection_name] + "/"
collection = taxii2client.Collection(collection_url)
taxii_ds = stix2.TAXIICollectionSource(collection)
# Create an in-memory source (to prevent multiple web requests)
return stix2.MemorySource(stix_data=taxii_ds.query())
from stix2 import TAXIICollectionSource, Filter
from taxii2client import Collection
import argparse
# Establish TAXII2 Collection instance for Enterprise ATT&CK collection
collection = Collection("https://cti-taxii.mitre.org/stix/collections/95ecc380-afe9-11e4-9b6c-751b66dd541e/")
# Supply the collection to TAXIICollection
tc_src = TAXIICollectionSource(collection)
def data_sources():
"""returns all data sources in Enterprise ATT&CK"""
all_data_srcs = []
# Get all techniques in Enterprise ATT&CK
techniques = tc_src.query([Filter("type", "=", "attack-pattern")])
# Get all data sources in Enterprise ATT&CK
for tech in techniques:
if 'x_mitre_data_sources' in tech:
all_data_srcs += [
data_src for data_src in tech.x_mitre_data_sources
if data_src not in all_data_srcs
]
def load_taxii(new=False):
collection = Collection("https://cti-taxii.mitre.org/stix/collections/" + domainToTaxiiCollectionId[domain])
data_store = TAXIICollectionSource(collection)
parse_subtechniques(data_store, new)
return load_datastore(data_store)
def connect_server(self, url=None):
"""
Allow user to specify what url to use
:param url:
:return:
"""
server_url = MITRE_TAXII_URL if url is None else url
self.attack_server = Server(server_url)
api_root = self.attack_server.api_roots[0]
# CompositeSource to query all the collections at once
c_sources = [TAXIICollectionSource(collection) for collection in api_root.collections]
self.composite_ds = CompositeDataSource()
self.composite_ds.add_data_sources(c_sources)
if args.matrix == 'pre':
matrix = "062767bd-02d2-4b72-84ba-56caef0f8658"
elif args.matrix == 'mobile':
matrix = "2f669986-b40b-4423-b720-4396ca6a462b"
elif args.matrix == 'enterprise':
matrix = "95ecc380-afe9-11e4-9b6c-751b66dd541e"
# Initialize dictionary to hold Enterprise ATT&CK content
attack = {}
# Establish TAXII2 Collection instance for Enterprise ATT&CK
collection = Collection("https://cti-taxii.mitre.org/stix/collections/{0}/"\
.format(matrix))
# Supply the collection to TAXIICollection
tc_source = TAXIICollectionSource(collection)
# Create filters to retrieve content from Enterprise ATT&CK
filter_objs = {"techniques": Filter("type", "=", "attack-pattern")}
# Retrieve all Enterprise ATT&CK content
for key in filter_objs:
attack[key] = tc_source.query(filter_objs[key])
all_techniques = attack["techniques"]
technique_count = 0
techniques_without_data_source = 0
techniques_observable = 0
techniques_with_data_sources = []
data_sources = set()
matching_techniques = set()