Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def generate(show_nodetect=False):
"""
generate and return a layer dict showing techniques used by APT3 and APT29 as well as software used by those groups
param show_nodetect, if true, causes techniques that have no data-sources to be highlighted as well
"""
stix = requests.get("https://raw.githubusercontent.com/mitre/cti/master/enterprise-attack/enterprise-attack.json").json()
ms = stix2.MemoryStore(stix_data=stix["objects"])
apt3 = ms.get("intrusion-set--0bbdf25b-30ff-4894-a1cd-49260d0dd2d9")
apt29 = ms.get("intrusion-set--899ce53f-13a0-479b-a0e4-67d46e241542")
techniques_used = {} # attackID => {apt3: boolean, apt29: boolean, software: Set, detection: boolean}
for apt in [apt3, apt29]:
def use_technique(technique, software=None):
"""helper function to record a technique as used"""
techniqueID = technique["external_references"][0]["external_id"]
# init struct if the technique has not been seen before
if not techniqueID in techniques_used:
techniques_used[techniqueID] = {
"APT3": False,
"APT29": False,
"software": set(),
def generate(softwaretype="software"):
""" generate and return a layer dict showing techniques used by software
If softwaretype is specified as "malware" or "tool", only shows software of that type. If softwaretype is specified as "software" output layer shows both malware and tools
"""
# import the STIX data from MITRE/CTI
stix = requests.get("https://raw.githubusercontent.com/mitre/cti/master/enterprise-attack/enterprise-attack.json").json()
ms = stix2.MemoryStore(stix_data=stix["objects"])
# software includes malware and tool types so perform two queries and merge the results
software_filters = []
if softwaretype == "malware" or softwaretype == "software":
software_filters.append( [ stix2.Filter('type', '=', 'malware') ] )
if softwaretype == "tool" or softwaretype == "software":
software_filters.append( [ stix2.Filter('type', '=', 'tool') ] )
software = list(chain.from_iterable(
ms.query(f) for f in software_filters
))
# build a list of techniques used by software
techniques_used = {} #attackID => using software names
for thesoftware in software:
# filter out revoked and deprecated software
if ("x_mitre_deprecated" in thesoftware and thesoftware["x_mitre_deprecated"]) or ("revoked" in thesoftware and thesoftware["revoked"]): continue
def load_dir(dir, new=False):
data_store = MemoryStore()
datafile = os.path.join(dir, domain + ".json")
data_store.load_from_file(datafile)
parse_subtechniques(data_store, new)
return load_datastore(data_store)
FileSystemSource, Filter, GranularMarking, HTTPRequestExt,
ICMPExt, IPv4Address, IPv6Address, KillChainPhase, MACAddress,
MarkingDefinition, MemoryStore, Mutex, NetworkTraffic, NTFSExt,
parse_observable, PDFExt, Process, RasterImageExt, Relationship,
Sighting, SocketExt, Software, StatementMarking,
TAXIICollectionSource, TCPExt, TLP_AMBER, TLP_GREEN, TLP_RED,
TLP_WHITE, TLPMarking, UNIXAccountExt, URL, UserAccount,
WindowsPEBinaryExt, WindowsPEOptionalHeaderType,
WindowsPESection, WindowsProcessExt, WindowsRegistryKey,
WindowsRegistryValueType, WindowsServiceExt, X509Certificate,
X509V3ExtenstionsType
)
from .datastore.filters import FilterSet
# Use an implicit MemoryStore
_environ = Environment(store=MemoryStore())
create = _environ.create
set_default_creator = _environ.set_default_creator
set_default_created = _environ.set_default_created
set_default_external_refs = _environ.set_default_external_refs
set_default_object_marking_refs = _environ.set_default_object_marking_refs
get = _environ.get
all_versions = _environ.all_versions
query = _environ.query
creator_of = _environ.creator_of
relationships = _environ.relationships
related_to = _environ.related_to
save = _environ.add
add_filters = _environ.add_filters
add_filter = _environ.add_filter
parse = _environ.parse
def generate():
"""parse the STIX on MITRE/CTI and return a layer dict with techniques with randomized scores"""
# import the STIX data from MITRE/CTI
stix = requests.get("https://raw.githubusercontent.com/mitre/cti/master/enterprise-attack/enterprise-attack.json").json()
ms = stix2.MemoryStore(stix_data=stix["objects"])
# get all techniques in STIX
techniques = ms.query([
stix2.Filter("type", "=", "attack-pattern")
])
# parse techniques into layer format
techniques_list = []
for technique in techniques:
# skip deprecated and revoked
if ("x_mitre_deprecated" in technique and technique["x_mitre_deprecated"]) or ("revoked" in technique and technique["revoked"]): continue
techniqueID = technique["external_references"][0]["external_id"] # get the attackID
techniques_list.append({
"techniqueID": techniqueID,
"score": random.randint(1,100) # random score
})
# return the techniques in a layer dict
return {
def generate():
"""parse the STIX on MITRE/CTI and return a layer dict showing all techniques used by an APT group with phrase 'bear' in the group aliases."""
# import the STIX data from MITRE/CTI
stix = requests.get("https://raw.githubusercontent.com/mitre/cti/master/enterprise-attack/enterprise-attack.json").json()
ms = stix2.MemoryStore(stix_data=stix["objects"])
groups = ms.query([ stix2.Filter("type", "=", "intrusion-set") ])
# find bear groups
bear_groups = [] #list of groups with bear in name
for group in groups:
# filter out deprecated and revoked groups
if ("x_mitre_deprecated" in group and group["x_mitre_deprecated"]) or ("revoked" in group and group["revoked"]): continue
# check all aliases for bear
for alias in group["aliases"]:
if re.match(".*bear.*", alias, re.IGNORECASE) is not None:
bear_groups.append(group)
break # don't match the same group multiple times
# find techniques used by bear groups
techniques_used = {} #attackID => using bear groups