Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def entity_creation(self):
try:
file = './msticpy/tests/testdata/entities.json'
with open(file, 'r') as file_handle:
txt = file_handle.read()
entity_dict = json.loads(txt)
parsed_entities = []
for _, entity in entity_dict.items():
e = Entity.instantiate_entity(entity)
self.assertIsInstance(e, Entity)
if e['Type'] == 'account':
self.assertIsInstance(e, Account)
self.assertTrue('Name' in e)
self.assertGreater(len(e.Name), 0)
elif e['Type'] == 'host':
self.assertIsInstance(e, Host)
self.assertTrue('HostName' in e)
self.assertGreater(len(e.HostName), 0)
elif e['Type'] == 'process':
self.assertIsInstance(e, Process)
self.assertTrue('ProcessId' in e)
self.assertGreater(len(e.ProcessId), 0)
elif e['Type'] == 'file':
self.assertIsInstance(e, File)
self.assertTrue('Name' in e)
def _add_paths(self, full_path):
if "/" in full_path:
self.PathSeparator = "/"
self.OSFamily = OSFamily.Linux
else:
self.PathSeparator = "\\"
self.OSFamily = OSFamily.Windows
self.FullPath = full_path
self.Name = full_path.split(self.PathSeparator)[-1]
self.Directory = full_path.split(self.PathSeparator)[:-1]
@export
class FileHash(Entity):
"""
File Hash class.
Attributes
----------
Algorithm : Algorithm
FileHash Algorithm
Value : str
FileHash Value
"""
def __init__(self, src_entity: Mapping[str, Any] = None, **kwargs):
"""
Create a new instance of the entity type.
def _get_other_name_desc(entity):
if "Name" in entity:
e_name = entity["Name"]
e_name = "{}: {}".format(entity["Type"], e_name)
else:
e_name = entity["Type"]
if isinstance(entity, Entity):
ent_props = entity.properties
elif isinstance(entity, dict):
ent_props = entity
else:
ent_props = {"unknown": None}
# Nasty dict comprehension to join all other items in the
# dictionary into a string
e_properties = "\n".join(
{
"{}:{}".format(k, v)
for (k, v) in ent_props.items()
if (k not in ("Type", "Name") and isinstance(v, str))
}
)
e_description = "{}\n{})".format(e_name, e_properties)
def _to_dict(self, entity) -> dict:
"""Return as simple nested dictionary."""
ent_dict = {}
for prop, val in entity.properties.items():
if val:
if isinstance(val, Entity):
ent_dict[prop] = self._to_dict(val)
else:
ent_dict[prop] = val
return ent_dict
# .AlertContracts.V3.Entities.IP)
"SourceAddress": "IPAddress",
# SourcePort (type System.Nullable`1[System.Int32])
"SourcePort": None,
# DestinationAddress (type Microsoft.Azure.Security.Detection
# .AlertContracts.V3.Entities.IP)
"DestinationAddress": "IPAddress",
# DestinationPort (type System.Nullable`1[System.Int32])
"DestinationPort": None,
# Protocol (type System.Nullable`1[System.Net.Sockets.ProtocolType])
"Protocol": None,
}
@export
class Process(Entity):
"""
Process Entity class.
Attributes
----------
ProcessId : str
Process ProcessId
CommandLine : str
Process CommandLine
ElevationToken : str
Process ElevationToken
CreationTimeUtc : datetime
Process CreationTimeUtc
ImageFile : File
Process ImageFile
Account : Account
_entity_schema = {
# Account
"Account": "Account",
# StartTimeUtc (type System.Nullable`1[System.DateTime])
"StartTimeUtc": None,
# EndTimeUtc (type System.Nullable`1[System.DateTime])
"EndTimeUtc": None,
# Host
"Host": "Host",
# SessionId (type System.String)
"SessionId": None,
}
@export
class CloudApplication(Entity):
"""
CloudApplication Entity class.
Attributes
----------
Name : str
CloudApplication Name
"""
def __init__(self, src_entity: Mapping[str, Any] = None, **kwargs):
"""
Create a new instance of the entity type.
Parameters
----------
def __init__(self, src_row: pd.Series = None):
"""Instantiate a security alert from a pandas Series."""
super().__init__(src_row=src_row)
# add entities to dictionary to remove dups
self._src_entities: Dict[int, Entity] = dict()
self.extended_properties: Dict[str, Any] = {}
if src_row is not None:
if "Entities" in src_row:
self._extract_entities(src_row)
if "ExtendedProperties" in src_row:
if isinstance(src_row.ExtendedProperties, dict):
self.extended_properties = src_row.ExtendedProperties
elif isinstance(src_row.ExtendedProperties, str):
try:
self.extended_properties = json.loads(
src_row.ExtendedProperties
)
except JSONDecodeError:
pass
entitytype=entity["Type"],
name=e_name,
description=e_desc,
color="green",
node_type="entity",
source=str(entity),
)
# add an edge by default to the alert
alertentity_graph.add_edge(alert["AlertType"], e_name)
# Rather than just add edges to the alert, we want to follow the 'natural'
# relationships between entities and child entities
# So if this entity has a property that is an entity, we add an edge to it
# and prune any edge that it might have to the alert
if isinstance(entity, Entity):
ent_props = entity.properties
elif isinstance(entity, dict):
ent_props = entity
else:
continue
for prop, rel_entity in [
(p, v) for (p, v) in ent_props.items() if isinstance(v, Entity)
]:
if rel_entity["Type"] == "host":
# don't add a new edge to the host
continue
# get the node id of the related entity and add an edge if it
# doesn't already exist
(related_entity, _) = _get_name_and_description(rel_entity)
if not alertentity_graph.has_edge(related_entity, e_name):
if v is not None:
try:
# If the property is an enum
if v == RegistryHive.__name__:
self[k] = RegistryHive[src_entity[k]]
elif v == OSFamily.__name__:
self[k] = OSFamily[src_entity[k]]
elif v == ElevationToken.__name__:
self[k] = ElevationToken[src_entity[k]]
elif v == Algorithm.__name__:
self[k] = Algorithm[src_entity[k]]
elif isinstance(v, tuple):
# if the property is a collection
entity_list = []
for col_entity in src_entity[k]:
entity_list.append(Entity.instantiate_entity(col_entity))
self[k] = entity_list
else:
# else try to instantiate an entity
self[k] = Entity.instantiate_entity(src_entity[k])
except KeyError:
# Catch key errors from invalid enum values
self[k] = None
"Sid": None,
# AadTenantId (type System.Nullable`1[System.Guid])
"AadTenantId": None,
# AadUserId (type System.Nullable`1[System.Guid])
"AadUserId": None,
# PUID (type System.Nullable`1[System.Guid])
"PUID": None,
# IsDomainJoined (type System.Nullable`1[System.Boolean])
"IsDomainJoined": None,
# DisplayName (type System.String)
"DisplayName": None,
}
@export
class SecurityGroup(Entity):
"""
SecurityGroup Entity class.
Attributes
----------
DistinguishedName : str
SecurityGroup DistinguishedName
SID : str
SecurityGroup SID
ObjectGuid : str
SecurityGroup ObjectGuid
"""
def __init__(self, src_entity: Mapping[str, Any] = None, **kwargs):
"""