Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
add_query(
KqlQuery(
name="dummy_query",
query="""
{table}
{query_project}
| where StartTimeUtc >= datetime({start})
| where StartTimeUtc <= datetime({end})
| summarize alertCount=count(), firstAlert=min(StartTimeUtc),
lastAlert=max(StartTimeUtc) by AlertName
| order by alertCount desc
""",
description="Retrieves summary of current alerts",
data_source="security_alert",
data_families=[DataFamily.WindowsSecurity],
data_environments=[DataEnvironment.LogAnalytics],
)
)
add_query(
name="dummy_query2",
query="""
{table}
{query_project}
| where StartTimeUtc >= datetime({start})
| where StartTimeUtc <= datetime({end})
| summarize alertCount=count(), firstAlert=min(StartTimeUtc),
lastAlert=max(StartTimeUtc) by AlertName
| order by alertCount desc
""",
description="Retrieves summary of current alerts",
data_source="security_alert",
data_families=[DataFamily.WindowsSecurity],
)
add_query(
name="dummy_query2",
query="""
{table}
{query_project}
| where StartTimeUtc >= datetime({start})
| where StartTimeUtc <= datetime({end})
| summarize alertCount=count(), firstAlert=min(StartTimeUtc),
lastAlert=max(StartTimeUtc) by AlertName
| order by alertCount desc
""",
description="Retrieves summary of current alerts",
data_source="security_alert",
data_families=[DataFamily.WindowsSecurity],
data_environments=[DataEnvironment.LogAnalytics],
)
self.assertEqual(before_alerts + 2, len(queries.query_definitions))
self.assertIn("get_alert", kql.__dict__)
self.assertIn("get_process_parent", kql.__dict__)
# This returns None and prints output but should execute with error
self.assertRaises(LookupError, kql.get_process_parent)
def get_data_environments(cls):
"""Return the environments defined in the schema."""
return list({DataEnvironment(env_name).name for env_name in cls.DATA_MAPPINGS})
def data_environment(self) -> DataEnvironment:
"""Return the data environment of the alert for subsequent queries."""
if self.is_in_log_analytics:
return DataEnvironment.LogAnalytics
return DataEnvironment.Kusto
from .query_defns import DataFamily, DataEnvironment
from ..common.utility import export
from .._version import VERSION
__version__ = VERSION
__author__ = "Ian Hellen"
@deprecated(reason="Superceded by msticpy.data.QueryProvider", version="0.2.0")
@export
class DataSchema:
"""DataSchema class for Log Analytics Queries."""
DATA_MAPPINGS: Dict[
DataEnvironment, Dict[DataFamily, Dict[str, Dict[str, str]]]
] = {DataEnvironment.LogAnalytics: {}, DataEnvironment.Kusto: {}}
DATA_MAPPINGS[DataEnvironment.LogAnalytics] = {
DataFamily.WindowsSecurity: {},
DataFamily.LinuxSecurity: {},
DataFamily.SecurityAlert: {},
}
_SECURITY_ALERT = {
"table": "SecurityAlert",
"query_project": """| project
TenantId,
StartTimeUtc = StartTime,
EndTimeUtc = EndTime,
ProviderAlertId = VendorOriginalId,
SystemAlertId,
ProviderName,
candidate_environments = set()
for provider in providers:
family, env = _get_env_and_family(provider.query_params)
if family != DataFamily.Unknown:
candidate_families.add(family)
if env != DataEnvironment.Unknown:
candidate_environments.add(env)
if custom_params:
# If we haven't yet worked out the data family and environment
# try to get this from one of custom_params
family, env = _get_env_and_family(custom_params)
if family != DataFamily.Unknown:
candidate_families.add(family)
if env != DataEnvironment.Unknown:
candidate_environments.add(env)
# get the intersection of families and environments that we found and those
# supported by the query. If it is 1 item we are good to go.
usable_families = candidate_families & set(kql_query.data_families)
if len(usable_families) == 1:
data_family = usable_families.pop()
usable_environments = candidate_environments & set(kql_query.data_environments)
if len(usable_environments) == 1:
data_environment = usable_environments.pop()
return data_family, data_environment
name="get_host_logon",
query="""
{table}
{query_project}
| where {subscription_filter}
| where {host_filter_eq}
| where TimeGenerated >= datetime({start})
| where TimeGenerated <= datetime({end})
| where TargetLogonId == \'{logon_session_id}\'
{add_query_items}
""",
description="""
Retrieves the logon event for the session id on the host.""",
data_source="account_logon",
data_families=[DataFamily.WindowsSecurity, DataFamily.LinuxSecurity],
data_environments=[DataEnvironment.LogAnalytics],
optional_params=["add_query_items"],
)
)
_add_query(
KqlQuery(
name="list_host_logons",
query="""
{table}
{query_project}
| where {subscription_filter}
| where {host_filter_eq}
| where TimeGenerated >= datetime({start})
| where TimeGenerated <= datetime({end})
{add_query_items}
""",
# If there is only one data family for this query, then use that
if len(kql_query.data_families) == 1:
data_family = kql_query.data_families[0]
if len(kql_query.data_environments) == 1:
data_environment = kql_query.data_environments[0]
if data_family and data_environment:
return data_family, data_environment
candidate_families = set()
candidate_environments = set()
for provider in providers:
family, env = _get_env_and_family(provider.query_params)
if family != DataFamily.Unknown:
candidate_families.add(family)
if env != DataEnvironment.Unknown:
candidate_environments.add(env)
if custom_params:
# If we haven't yet worked out the data family and environment
# try to get this from one of custom_params
family, env = _get_env_and_family(custom_params)
if family != DataFamily.Unknown:
candidate_families.add(family)
if env != DataEnvironment.Unknown:
candidate_environments.add(env)
# get the intersection of families and environments that we found and those
# supported by the query. If it is 1 item we are good to go.
usable_families = candidate_families & set(kql_query.data_families)
if len(usable_families) == 1:
def data_environment(self) -> DataEnvironment:
"""Return the data environment of the alert for subsequent queries."""
if self.is_in_log_analytics:
return DataEnvironment.LogAnalytics
return DataEnvironment.Kusto