Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def create_ip_entity(row):
ip_ent = IpAddress(Address=row["AllExtIPs"])
geo_loc = create_geo_entity(row)
ip_ent.Location = geo_loc
return ip_ent
self.assertTrue('Name' in e)
self.assertGreater(len(e.Name), 0)
elif e['Type'] == 'host':
self.assertIsInstance(e, Host)
self.assertTrue('HostName' in e)
self.assertGreater(len(e.HostName), 0)
elif e['Type'] == 'process':
self.assertIsInstance(e, Process)
self.assertTrue('ProcessId' in e)
self.assertGreater(len(e.ProcessId), 0)
elif e['Type'] == 'file':
self.assertIsInstance(e, File)
self.assertTrue('Name' in e)
self.assertGreater(len(e.Name), 0)
elif e['Type'] == 'ipaddress':
self.assertIsInstance(e, IpAddress)
self.assertTrue('Address' in e)
self.assertGreater(len(e.Address), 0)
parsed_entities.append(e)
self.assertGreaterEqual(len(parsed_entities), 7)
except Exception as ex:
self.fail(msg='Exception {}'.format(str(ex)))
Generate ip_entity record for provided IP value.
Parameters
----------
heartbeat_df : pd.DataFrame
A dataframe of heartbeat data for the host
az_net_df : pd.DataFrame
Option dataframe of Azure network data for the host
Returns
-------
IP
Details of the IP data collected
"""
ip_entity = IpAddress()
# Produce ip_entity record using available dataframes
ip_hb = heartbeat_df.iloc[0]
ip_entity.Address = ip_hb["ComputerIP"]
ip_entity.hostname = ip_hb["Computer"] # type: ignore
ip_entity.SourceComputerId = ip_hb["SourceComputerId"] # type: ignore
ip_entity.OSType = ip_hb["OSType"] # type: ignore
ip_entity.OSName = ip_hb["OSName"] # type: ignore
ip_entity.OSVMajorersion = ip_hb["OSMajorVersion"] # type: ignore
ip_entity.OSVMinorVersion = ip_hb["OSMinorVersion"] # type: ignore
ip_entity.ComputerEnvironment = ip_hb["ComputerEnvironment"] # type: ignore
ip_entity.OmsSolutions = [ # type: ignore
sol.strip() for sol in ip_hb["Solutions"].split(",")
]
ip_entity.VMUUID = ip_hb["VMUUID"] # type: ignore
ip_entity.SubscriptionId = ip_hb["SubscriptionId"] # type: ignore
applications.append(app)
# Produce host_entity record mapping linux heartbeat elements to host_entity fields
host_hb = heartbeat_df.iloc[0]
host_entity.SourceComputerId = host_hb["SourceComputerId"] # type: ignore
host_entity.OSType = host_hb["OSType"] # type: ignore
host_entity.OSName = host_hb["OSName"] # type: ignore
host_entity.OSVMajorersion = host_hb["OSMajorVersion"] # type: ignore
host_entity.OSVMinorVersion = host_hb["OSMinorVersion"] # type: ignore
host_entity.ComputerEnvironment = host_hb["ComputerEnvironment"] # type: ignore
host_entity.OmsSolutions = [ # type: ignore
sol.strip() for sol in host_hb["Solutions"].split(",")
] # type: ignore
host_entity.Applications = applications # type: ignore
host_entity.VMUUID = host_hb["VMUUID"] # type: ignore
ip_entity = IpAddress()
ip_entity.Address = host_hb["ComputerIP"]
geoloc_entity = GeoLocation()
geoloc_entity.CountryName = host_hb["RemoteIPCountry"] # type: ignore
geoloc_entity.Longitude = host_hb["RemoteIPLongitude"] # type: ignore
geoloc_entity.Latitude = host_hb["RemoteIPLatitude"] # type: ignore
ip_entity.Location = geoloc_entity # type: ignore
host_entity.IPAddress = ip_entity # type: ignore
# If Azure network data present add this to host record
if az_net_df is not None and not az_net_df.empty:
if len(az_net_df) == 1:
priv_addr_str = az_net_df["PrivateIPAddresses"].loc[0]
host_entity["private_ips"] = convert_to_ip_entities(priv_addr_str)
pub_addr_str = az_net_df["PublicIPAddresses"].loc[0]
host_entity["public_ips"] = convert_to_ip_entities(pub_addr_str)
else:
-------
List
The populated IP entities including address and geo-location
"""
ip_entities = []
if ip_str:
if "," in ip_str:
addrs = ip_str.split(",")
elif " " in ip_str:
addrs = ip_str.split(" ")
else:
addrs = [ip_str]
for addr in addrs:
ip_entity = IpAddress()
ip_entity.Address = addr.strip()
try:
ip_lookup = _GET_IP_LOOKUP()
ip_lookup.lookup_ip(ip_entity=ip_entity)
except DataError:
pass
ip_entities.append(ip_entity)
return ip_entities
pass
if not isinstance(heartbeat_df, pd.DataFrame) or heartbeat_df.empty:
raise KQLDataError(f"No heartbeat data provided")
host_hb = heartbeat_df.iloc[0]
host_entity.SourceComputerId = host_hb["SourceComputerId"]
host_entity.OSType = host_hb["OSType"]
host_entity.OSName = host_hb["OSName"]
host_entity.OSVMajorersion = host_hb["OSMajorVersion"]
host_entity.OSVMinorVersion = host_hb["OSMinorVersion"]
host_entity.ComputerEnvironment = host_hb["ComputerEnvironment"]
host_entity.OmsSolutions = [sol.strip() for sol in host_hb["Solutions"].split(",")]
host_entity.Applications = applications
host_entity.VMUUID = host_hb["VMUUID"]
ip_entity = IpAddress()
ip_entity.Address = host_hb["ComputerIP"]
geoloc_entity = GeoLocation()
geoloc_entity.CountryName = host_hb["RemoteIPCountry"]
geoloc_entity.Longitude = host_hb["RemoteIPLongitude"]
geoloc_entity.Latitude = host_hb["RemoteIPLatitude"]
ip_entity.Location = geoloc_entity
host_entity.IPAddress = ip_entity
if az_net_df is None or az_net_df.empty:
pass
else:
if len(az_net_df) == 1:
priv_addr_str = az_net_df["PrivateIPAddresses"].loc[0]
host_entity["private_ips"] = convert_to_ip_entities(priv_addr_str)
pub_addr_str = az_net_df["PublicIPAddresses"].loc[0]
host_entity["public_ips"] = convert_to_ip_entities(pub_addr_str)
def _create_ip_entity(ip_loc: dict, ip_entity) -> IpAddress:
if not ip_entity:
ip_entity = IpAddress()
ip_entity.Address = ip_loc["ip"]
geo_entity = GeoLocation()
geo_entity.CountryCode = ip_loc["country_code"] # type: ignore
geo_entity.CountryName = ip_loc["country_name"] # type: ignore
geo_entity.State = ip_loc["region_name"] # type: ignore
geo_entity.City = ip_loc["city"] # type: ignore
geo_entity.Longitude = ip_loc["longitude"] # type: ignore
geo_entity.Latitude = ip_loc["latitude"] # type: ignore
if "connection" in ip_loc:
geo_entity.Asn = ip_loc["connection"]["asn"] # type: ignore
ip_entity.Location = geo_entity
return ip_entity
return "OtherEntity"
_entity_schema = {} # type: Dict[str, Any]
# Dictionary to map text names of types to the class.
Entity.ENTITY_NAME_MAP.update(
{
"account": Account,
"host": Host,
"process": Process,
"file": File,
"cloudapplication": CloudApplication,
"dnsresolve": DnsResolve,
"ipaddress": IpAddress,
"ip": IpAddress,
"networkconnection": NetworkConnection,
"malware": Malware,
"registry-key": RegistryKey,
"registrykey": RegistryKey,
"registry-value": RegistryValue,
"registryvalue": RegistryValue,
"host-logon-session": HostLogonSession,
"hostlogonsession": HostLogonSession,
"filehash": FileHash,
"security-group": SecurityGroup,
"securitygroup": SecurityGroup,
"alerts": Alert,
"alert": Alert,
}
if 'Account' in event_proc:
logon_session.Account = event_proc.Account
event_proc.Account.Host = host
self._entities.append(event_proc.Account)
self._entities.append(logon_session)
if 'ImageFile' in event_proc:
self._entities.append(event_proc.ImageFile)
if event_id in ('4624', '4625'):
subj_account = Account(src_event=src_row, role='subject')
subj_account.Host = host
self._entities.append(subj_account)
tgt_account = Account(src_event=src_row, role='target')
tgt_account.Host = host
self._entities.append(tgt_account)
self._entities.append(IpAddress(src_event=src_row))