How to use the msticpy.nbtools.query_defns.DataFamily function in msticpy

To help you get started, we’ve selected a few msticpy examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github microsoft / msticpy / tests / test_security_alert.py View on Github external
self.assertIsNotNone(alert.primary_account)
        self.assertEqual('TESTHOST$', alert.primary_account.Name)
        self.assertEqual('DOM\\TESTHOST$', alert.primary_account.qualified_name)
        self.assertEqual('0x3e7', alert.get_logon_id())

        self.assertIn('SourceComputerId', alert.host_filter(operator='=='))
        self.assertTrue(alert.is_in_log_analytics)
        self.assertTrue(alert.is_in_workspace)
        self.assertFalse(alert.is_in_azure_sub)
        self.assertIsNotNone(alert.host_filter(operator='=='))
        self.assertIn('true', alert.subscription_filter(operator='=='))

        self.assertEqual(2, len(alert.get_entities_of_type(entity_type='file')))

        self.assertGreater(len(alert.query_params), 5)
        self.assertEqual(alert.data_family, DataFamily.WindowsSecurity)
        self.assertEqual(alert.data_environment, DataEnvironment.LogAnalytics)
github microsoft / msticpy / tests / test_query_mgr.py View on Github external
before_alerts = len(queries.query_definitions)
            add_query(
                KqlQuery(
                    name="dummy_query",
                    query="""
                                    {table}
                                    {query_project}
                                    | where StartTimeUtc >= datetime({start})
                                    | where StartTimeUtc <= datetime({end})
                                    | summarize alertCount=count(), firstAlert=min(StartTimeUtc),
                                    lastAlert=max(StartTimeUtc) by AlertName
                                    | order by alertCount desc
                                    """,
                    description="Retrieves summary of current alerts",
                    data_source="security_alert",
                    data_families=[DataFamily.WindowsSecurity],
                    data_environments=[DataEnvironment.LogAnalytics],
                )
            )
            add_query(
                name="dummy_query2",
                query="""
                            {table}
                            {query_project}
                            | where StartTimeUtc >= datetime({start})
                            | where StartTimeUtc <= datetime({end})
                            | summarize alertCount=count(), firstAlert=min(StartTimeUtc),
                            lastAlert=max(StartTimeUtc) by AlertName
                            | order by alertCount desc
                            """,
                description="Retrieves summary of current alerts",
                data_source="security_alert",
github microsoft / msticpy / msticpy / nbtools / query_schema.py View on Github external
SubjectDomainName='',
                                            SubjectUserSid=auid,
                                            TargetUserName=user,
                                            TargetDomainName='',
                                            TargetUserSid=uid,
                                            TargetLogonId=ses,
                                            LogonProcessName=exe,
                                            LogonType=0,
                                            AuthenticationPackageName,
                                            Status=res,
                                            audit_user,
                                            IpAddress=addr,
                                            WorkstationName=hostname""",
    }
    # Add to the main dictionaries
    DATA_MAPPINGS[DataEnvironment.LogAnalytics][DataFamily.SecurityAlert] = {
        "security_alert": _SECURITY_ALERT
    }
    DATA_MAPPINGS[DataEnvironment.LogAnalytics][DataFamily.WindowsSecurity] = {
        "process_create": _PROC_CREATE_WIN,
        "account_logon": _ACCOUNT_LOGON_WIN,
        "account_logon_fail": _ACCOUNT_LOGON_FAIL_WIN,
    }
    DATA_MAPPINGS[DataEnvironment.LogAnalytics][DataFamily.LinuxSecurity] = {
        "process_create": _PROC_CREATE_LX,
        "account_logon": _ACCOUNT_LOGON_LX,
    }

    def __init__(
        self,
        environment: DataEnvironment = DataEnvironment.LogAnalytics,
        data_family: DataFamily = DataFamily.WindowsSecurity,
github microsoft / msticpy / msticpy / nbtools / query_builtin_queries.py View on Github external
KqlQuery(
        name="list_processes",
        query="""
let start = datetime({start});
let end = datetime({end});
{table}
{query_project}
| where {subscription_filter}
| where {host_filter_eq}
| where TimeGenerated >= start
| where TimeGenerated <= end
{add_query_items}
""",
        description="Retrieves processes for a host.",
        data_source="process_create",
        data_families=[DataFamily.WindowsSecurity, DataFamily.LinuxSecurity],
        data_environments=[DataEnvironment.LogAnalytics],
        optional_params=["add_query_items"],
    )
)

_add_query(
    KqlQuery(
        name="get_process_parent",
        query="""
let start = datetime({start});
let end = datetime({end});
let sourceProcessId = \'{process_id}\';
let sourceLogonId = \'{logon_session_id}\';
let sourceProcess =
materialize(
    {table}
github microsoft / msticpy / msticpy / nbtools / query_mgr.py View on Github external
Parameters
    ----------
    params : Mapping[str, Any]
        Input dictionary

    Returns
    -------
    Tuple[Optional[DataFamily],Optional[DataEnvironment]]
        Tuple of family and environment, if found.

    """
    family = None
    environment = None

    if _DATA_FAMILY_NAME in params:
        family = DataFamily.parse(params[_DATA_FAMILY_NAME])
    if _DATA_ENVIRONMENT_NAME in params:
        environment = DataEnvironment.parse(params[_DATA_ENVIRONMENT_NAME])
    return family, environment
github microsoft / msticpy / msticpy / nbtools / query_mgr.py View on Github external
Add a query to the current set.

    Parameters
    ----------
    kql_query : KqlQuery, optional
        KqlQuery object to add
        (the default is None, which prints help)
    kwargs : Mapping[str, Any]
        If kql_query is not supplied the kwargs must
        include `name`, `query` and `data_source`
        keyword parameters.

    """
    if kql_query is None:
        def_data_families = [DataEnvironment.LogAnalytics]
        def_data_environments = [DataFamily.WindowsSecurity, DataFamily.LinuxSecurity]
        if "name" not in kwargs or "query" not in kwargs or "data_source" not in kwargs:
            raise ValueError(
                "If kql_query is not supplied the kwargs",
                " must include name, query and data_source.",
            )
        kql_query = KqlQuery(
            name=kwargs["name"],
            query=kwargs["query"],
            description=kwargs.get("description", None),
            data_source=kwargs["data_source"],
            data_families=kwargs.get("data_families", def_data_families),
            data_environments=kwargs.get("data_environments", def_data_environments),
        )
    query_definitions[kql_query.name] = kql_query
    _add_queries_to_module(__name__)
github microsoft / msticpy / msticpy / nbtools / query_schema.py View on Github external
def get_data_families(cls):
        """Return the data families in the schema."""
        families = set()
        for _, data in cls.DATA_MAPPINGS.items():
            for data_map in data:
                families.add(DataFamily(data_map).name)

        return list(families)
github microsoft / msticpy / msticpy / nbtools / query_mgr.py View on Github external
candidate_families = set()
    candidate_environments = set()
    for provider in providers:
        family, env = _get_env_and_family(provider.query_params)
        if family != DataFamily.Unknown:
            candidate_families.add(family)
        if env != DataEnvironment.Unknown:
            candidate_environments.add(env)

    if custom_params:
        # If we haven't yet worked out the data family and environment
        # try to get this from one of custom_params
        family, env = _get_env_and_family(custom_params)

        if family != DataFamily.Unknown:
            candidate_families.add(family)
        if env != DataEnvironment.Unknown:
            candidate_environments.add(env)

    # get the intersection of families and environments that we found and those
    # supported by the query. If it is 1 item we are good to go.
    usable_families = candidate_families & set(kql_query.data_families)
    if len(usable_families) == 1:
        data_family = usable_families.pop()
    usable_environments = candidate_environments & set(kql_query.data_environments)
    if len(usable_environments) == 1:
        data_environment = usable_environments.pop()

    return data_family, data_environment
github microsoft / msticpy / msticpy / nbtools / query_mgr.py View on Github external
data_environment = None  # type: Optional[DataEnvironment]

    # If there is only one data family for this query, then use that
    if len(kql_query.data_families) == 1:
        data_family = kql_query.data_families[0]
    if len(kql_query.data_environments) == 1:
        data_environment = kql_query.data_environments[0]

    if data_family and data_environment:
        return data_family, data_environment

    candidate_families = set()
    candidate_environments = set()
    for provider in providers:
        family, env = _get_env_and_family(provider.query_params)
        if family != DataFamily.Unknown:
            candidate_families.add(family)
        if env != DataEnvironment.Unknown:
            candidate_environments.add(env)

    if custom_params:
        # If we haven't yet worked out the data family and environment
        # try to get this from one of custom_params
        family, env = _get_env_and_family(custom_params)

        if family != DataFamily.Unknown:
            candidate_families.add(family)
        if env != DataEnvironment.Unknown:
            candidate_environments.add(env)

    # get the intersection of families and environments that we found and those
    # supported by the query. If it is 1 item we are good to go.
github microsoft / msticpy / msticpy / nbtools / security_base.py View on Github external
def data_family(self) -> DataFamily:
        """Return the data family of the alert for subsequent queries."""
        if self.os_family == 'Linux':
            return DataFamily.LinuxSecurity
        if self.os_family == 'Windows':
            return DataFamily.WindowsSecurity
        raise ValueError('Unknown Data family.')