Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
'[ url:value = \'http://ctldl.windowsupdate.com/msdownload/update/v3/static/trustedr/en/authrootstl.cab?b16bed41061b4861\' ]',
'[ url:value = \'http://ctldl.windowsupdate.com/msdownload/update/v3/static/trustedr/en/authrootstl.cab?9a8ede518893069d\' ]',
'[ url:value = \'http://go.microsoft.com/fwlink/?LinkId=544713\' ]',
'[ url:value = \'http://ns.adobe.com/xap/1.0/mm/\' ]',
'[ url:value = \'http://ns.adobe.com/xap/1.0/sType/ResourceRef\' ]',
'[ url:value = \'http://ns.adobe.com/xap/1.0/\' ]',
'[ file:hashes.\'SHA-1\' = \'91fd2d2935aedcb47271b54cd22f8fe3b30c17fd\' OR file:hashes.\'SHA-256\' = \'90b1e39282dbda2341d91b87ca161afe564b7d3b4f82f25b3f1dce3fa857226c\' OR file:hashes.\'MD5\' = \'34303fdb55e5d0f1142bb07eed2064cb\' ]'
]
extracted_indicator_expressions = []
with open(os.path.join(CWD, 'sample_report.json')) as sample_report:
sample_json = json.load(sample_report)
bundle = stix2_generator.parse_json_report_to_stix2_bundle(sample_json)
for x in bundle.objects:
if isinstance(x, stix2.Indicator):
extracted_indicator_expressions.append(x.pattern)
assert all(x in all_indicators_expressions for x in extracted_indicator_expressions)
def _get_technique_listing(self, tactic, domain='enterprise'):
"""
INTERNAL - retrieves techniques for a given tactic and domain
:param tactic: The tactic to grab techniques from
:param domain: The domain to draw from
"""
techniques = []
subtechs = {}
techs = self.collections[domain].query([Filter('type', '=', 'attack-pattern'), Filter('kill_chain_phases.phase_name', '=', tactic)])
for entry in techs:
if entry['kill_chain_phases'][0]['kill_chain_name'] == 'mitre-attack':
tid = [t['external_id'] for t in entry['external_references'] if 'attack' in t['source_name']]
if '.' not in tid[0]:
techniques.append(MatrixEntry(id=tid[0], name=entry['name']))
else:
parent = tid[0].split('.')[0]
if parent not in subtechs:
subtechs[parent] = []
subtechs[parent].append(MatrixEntry(id=tid[0], name=entry['name']))
return techniques, subtechs
def __init__(self, **kwargs):
_STIXBase.__init__(self, **kwargs)
_cls_init(cls, self, kwargs)
:param server: Source to utilize (taxii or local)
:param local: string path to local cache of stix data
"""
self.convert_data = {}
if source.lower() not in ['taxii', 'local']:
print('[MatrixGen] - Unable to generate matrix, source {} is not one of "taxii" or "local"'.format(source))
raise BadSource
if source.lower() == 'taxii':
self.server = Server('https://cti-taxii.mitre.org/taxii')
self.api_root = self.server.api_roots[0]
self.collections = dict()
for collection in self.api_root.collections:
if collection.title != "PRE-ATT&CK":
tc = Collection('https://cti-taxii.mitre.org/stix/collections/' + collection.id)
self.collections[collection.title.split(' ')[0].lower()] = TAXIICollectionSource(tc)
elif source.lower() == 'local':
if local is not None:
try:
self.collections['enterprise'] = FileSystemSource(local)
self.collections['mobile'] = FileSystemSource(local)
except:
raise BadLocation
else:
print('[MatrixGen] - "local" source specified, but path to local source not provided')
raise BadSource
self.matrix = {}
self._build_matrix()
def __init__(self, **kwargs):
_Extension.__init__(self, **kwargs)
_cls_init(cls, self, kwargs)
raise ValueError("must not be empty.")
return result
class StringProperty(Property):
def __init__(self, **kwargs):
self.string_type = text_type
super(StringProperty, self).__init__(**kwargs)
def clean(self, value):
return self.string_type(value)
class TypeProperty(Property):
def __init__(self, type):
super(TypeProperty, self).__init__(fixed=type)
class IDProperty(Property):
def __init__(self, type):
self.required_prefix = type + "--"
super(IDProperty, self).__init__()
def clean(self, value):
if not value.startswith(self.required_prefix):
raise ValueError("must start with '{0}'.".format(self.required_prefix))
try:
uuid.UUID(value.split('--', 1)[1])
def __init__(self, **kwargs):
self.string_type = text_type
super(StringProperty, self).__init__(**kwargs)
def clean(self, value):
return self.string_type(value)
class TypeProperty(Property):
def __init__(self, type):
super(TypeProperty, self).__init__(fixed=type)
class IDProperty(Property):
def __init__(self, type):
self.required_prefix = type + "--"
super(IDProperty, self).__init__()
def clean(self, value):
if not value.startswith(self.required_prefix):
raise ValueError("must start with '{0}'.".format(self.required_prefix))
try:
uuid.UUID(value.split('--', 1)[1])
except Exception:
raise ValueError("must have a valid UUID after the prefix.")
return value
def default(self):
return self.required_prefix + str(uuid.uuid4())
def get_all_techniques(src, source_name):
"""Filters data source by attack-pattern which extracts all ATT&CK Techniques"""
filters = [
stix2.Filter("type", "=", "attack-pattern"),
stix2.Filter("external_references.source_name", "=", source_name),
]
results = src.query(filters)
return remove_deprecated(results)
def _get_tactic_listing(self, domain='enterprise'):
"""
INTERNAL - retrieves tactics for the associated domain
:param domain: The domain to draw from
"""
tactics = {}
t_filt = []
matrix = self.collections[domain].query([Filter('type', '=', 'x-mitre-matrix')])
for i in range(len(matrix)):
tactics[matrix[i]['name']] = []
for tactic_id in matrix[i]['tactic_refs']:
tactics[matrix[i]['name']].append(self.collections[domain].query([Filter('id', '=', tactic_id)])[0])
for entry in tactics[matrix[0]['name']]:
self.convert_data[entry['x_mitre_shortname']] = entry['name']
self.convert_data[entry['name']] = entry['x_mitre_shortname']
t_filt.append(MatrixEntry(id=entry['external_references'][0]['external_id'], name=entry['name']))
return t_filt
def generate():
"""parse the STIX on MITRE/CTI and return a layer dict with techniques with randomized scores"""
# import the STIX data from MITRE/CTI
stix = requests.get("https://raw.githubusercontent.com/mitre/cti/master/enterprise-attack/enterprise-attack.json").json()
ms = stix2.MemoryStore(stix_data=stix["objects"])
# get all techniques in STIX
techniques = ms.query([
stix2.Filter("type", "=", "attack-pattern")
])
# parse techniques into layer format
techniques_list = []
for technique in techniques:
# skip deprecated and revoked
if ("x_mitre_deprecated" in technique and technique["x_mitre_deprecated"]) or ("revoked" in technique and technique["revoked"]): continue
techniqueID = technique["external_references"][0]["external_id"] # get the attackID
techniques_list.append({
"techniqueID": techniqueID,
"score": random.randint(1,100) # random score
})
# return the techniques in a layer dict
return {
"name": "heatmap example",
"version": "3.0",
"sorting": 3, # descending order of score