Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
self.assertIsNotNone(alert.primary_account)
self.assertEqual('TESTHOST$', alert.primary_account.Name)
self.assertEqual('DOM\\TESTHOST$', alert.primary_account.qualified_name)
self.assertEqual('0x3e7', alert.get_logon_id())
self.assertIn('SourceComputerId', alert.host_filter(operator='=='))
self.assertTrue(alert.is_in_log_analytics)
self.assertTrue(alert.is_in_workspace)
self.assertFalse(alert.is_in_azure_sub)
self.assertIsNotNone(alert.host_filter(operator='=='))
self.assertIn('true', alert.subscription_filter(operator='=='))
self.assertEqual(2, len(alert.get_entities_of_type(entity_type='file')))
self.assertGreater(len(alert.query_params), 5)
self.assertEqual(alert.data_family, DataFamily.WindowsSecurity)
self.assertEqual(alert.data_environment, DataEnvironment.LogAnalytics)
before_alerts = len(queries.query_definitions)
add_query(
KqlQuery(
name="dummy_query",
query="""
{table}
{query_project}
| where StartTimeUtc >= datetime({start})
| where StartTimeUtc <= datetime({end})
| summarize alertCount=count(), firstAlert=min(StartTimeUtc),
lastAlert=max(StartTimeUtc) by AlertName
| order by alertCount desc
""",
description="Retrieves summary of current alerts",
data_source="security_alert",
data_families=[DataFamily.WindowsSecurity],
data_environments=[DataEnvironment.LogAnalytics],
)
)
add_query(
name="dummy_query2",
query="""
{table}
{query_project}
| where StartTimeUtc >= datetime({start})
| where StartTimeUtc <= datetime({end})
| summarize alertCount=count(), firstAlert=min(StartTimeUtc),
lastAlert=max(StartTimeUtc) by AlertName
| order by alertCount desc
""",
description="Retrieves summary of current alerts",
data_source="security_alert",
SubjectDomainName='',
SubjectUserSid=auid,
TargetUserName=user,
TargetDomainName='',
TargetUserSid=uid,
TargetLogonId=ses,
LogonProcessName=exe,
LogonType=0,
AuthenticationPackageName,
Status=res,
audit_user,
IpAddress=addr,
WorkstationName=hostname""",
}
# Add to the main dictionaries
DATA_MAPPINGS[DataEnvironment.LogAnalytics][DataFamily.SecurityAlert] = {
"security_alert": _SECURITY_ALERT
}
DATA_MAPPINGS[DataEnvironment.LogAnalytics][DataFamily.WindowsSecurity] = {
"process_create": _PROC_CREATE_WIN,
"account_logon": _ACCOUNT_LOGON_WIN,
"account_logon_fail": _ACCOUNT_LOGON_FAIL_WIN,
}
DATA_MAPPINGS[DataEnvironment.LogAnalytics][DataFamily.LinuxSecurity] = {
"process_create": _PROC_CREATE_LX,
"account_logon": _ACCOUNT_LOGON_LX,
}
def __init__(
self,
environment: DataEnvironment = DataEnvironment.LogAnalytics,
data_family: DataFamily = DataFamily.WindowsSecurity,
KqlQuery(
name="list_processes",
query="""
let start = datetime({start});
let end = datetime({end});
{table}
{query_project}
| where {subscription_filter}
| where {host_filter_eq}
| where TimeGenerated >= start
| where TimeGenerated <= end
{add_query_items}
""",
description="Retrieves processes for a host.",
data_source="process_create",
data_families=[DataFamily.WindowsSecurity, DataFamily.LinuxSecurity],
data_environments=[DataEnvironment.LogAnalytics],
optional_params=["add_query_items"],
)
)
_add_query(
KqlQuery(
name="get_process_parent",
query="""
let start = datetime({start});
let end = datetime({end});
let sourceProcessId = \'{process_id}\';
let sourceLogonId = \'{logon_session_id}\';
let sourceProcess =
materialize(
{table}
Parameters
----------
params : Mapping[str, Any]
Input dictionary
Returns
-------
Tuple[Optional[DataFamily],Optional[DataEnvironment]]
Tuple of family and environment, if found.
"""
family = None
environment = None
if _DATA_FAMILY_NAME in params:
family = DataFamily.parse(params[_DATA_FAMILY_NAME])
if _DATA_ENVIRONMENT_NAME in params:
environment = DataEnvironment.parse(params[_DATA_ENVIRONMENT_NAME])
return family, environment
Add a query to the current set.
Parameters
----------
kql_query : KqlQuery, optional
KqlQuery object to add
(the default is None, which prints help)
kwargs : Mapping[str, Any]
If kql_query is not supplied the kwargs must
include `name`, `query` and `data_source`
keyword parameters.
"""
if kql_query is None:
def_data_families = [DataEnvironment.LogAnalytics]
def_data_environments = [DataFamily.WindowsSecurity, DataFamily.LinuxSecurity]
if "name" not in kwargs or "query" not in kwargs or "data_source" not in kwargs:
raise ValueError(
"If kql_query is not supplied the kwargs",
" must include name, query and data_source.",
)
kql_query = KqlQuery(
name=kwargs["name"],
query=kwargs["query"],
description=kwargs.get("description", None),
data_source=kwargs["data_source"],
data_families=kwargs.get("data_families", def_data_families),
data_environments=kwargs.get("data_environments", def_data_environments),
)
query_definitions[kql_query.name] = kql_query
_add_queries_to_module(__name__)
def get_data_families(cls):
"""Return the data families in the schema."""
families = set()
for _, data in cls.DATA_MAPPINGS.items():
for data_map in data:
families.add(DataFamily(data_map).name)
return list(families)
candidate_families = set()
candidate_environments = set()
for provider in providers:
family, env = _get_env_and_family(provider.query_params)
if family != DataFamily.Unknown:
candidate_families.add(family)
if env != DataEnvironment.Unknown:
candidate_environments.add(env)
if custom_params:
# If we haven't yet worked out the data family and environment
# try to get this from one of custom_params
family, env = _get_env_and_family(custom_params)
if family != DataFamily.Unknown:
candidate_families.add(family)
if env != DataEnvironment.Unknown:
candidate_environments.add(env)
# get the intersection of families and environments that we found and those
# supported by the query. If it is 1 item we are good to go.
usable_families = candidate_families & set(kql_query.data_families)
if len(usable_families) == 1:
data_family = usable_families.pop()
usable_environments = candidate_environments & set(kql_query.data_environments)
if len(usable_environments) == 1:
data_environment = usable_environments.pop()
return data_family, data_environment
data_environment = None # type: Optional[DataEnvironment]
# If there is only one data family for this query, then use that
if len(kql_query.data_families) == 1:
data_family = kql_query.data_families[0]
if len(kql_query.data_environments) == 1:
data_environment = kql_query.data_environments[0]
if data_family and data_environment:
return data_family, data_environment
candidate_families = set()
candidate_environments = set()
for provider in providers:
family, env = _get_env_and_family(provider.query_params)
if family != DataFamily.Unknown:
candidate_families.add(family)
if env != DataEnvironment.Unknown:
candidate_environments.add(env)
if custom_params:
# If we haven't yet worked out the data family and environment
# try to get this from one of custom_params
family, env = _get_env_and_family(custom_params)
if family != DataFamily.Unknown:
candidate_families.add(family)
if env != DataEnvironment.Unknown:
candidate_environments.add(env)
# get the intersection of families and environments that we found and those
# supported by the query. If it is 1 item we are good to go.
def data_family(self) -> DataFamily:
"""Return the data family of the alert for subsequent queries."""
if self.os_family == 'Linux':
return DataFamily.LinuxSecurity
if self.os_family == 'Windows':
return DataFamily.WindowsSecurity
raise ValueError('Unknown Data family.')