Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@export
def risky_cmd_line(
events: pd.DataFrame,
log_type: str,
detection_rules: str = os.path.join(
os.path.join(os.path.dirname(os.path.dirname(__file__)), _DETECTIONS_DEF_DIR),
"cmd_line_rules.json",
),
cmd_field: str = "Command",
) -> dict:
"""
Detect patterns of risky commands in syslog messages.
Risky patterns are defined in a json format file.
Parameters
----------
@export
@lru_cache(maxsize=1024)
def token_count(value: str, delimiter: str = " ") -> int:
"""
Return count of delimiter-separated tokens pd.Series column.
Parameters
----------
value : str
Data to process
delimiter : str, optional
Delimiter used to split the column string.
(the default is ' ')
Returns
-------
int
@export
def entity_distance(ip_src: IpAddress, ip_dest: IpAddress) -> float:
"""
Return distance between two IP Entities.
Parameters
----------
ip_src : IpAddress
Source/Origin IpAddress Entity
ip_dest : IpAddress
Destination IpAddress Entity
Returns
-------
float
Distance in kilometers.
@export
def load_kql_magic():
"""Load KqlMagic if not loaded."""
# KqlMagic
if not _KQL_LOADER():
raise EnvironmentError("Kqlmagic did not load correctly.")
@export
def create_host_record(
syslog_df: pd.DataFrame, heartbeat_df: pd.DataFrame, az_net_df: pd.DataFrame = None
) -> Host:
"""
Generate host_entity record for selected computer.
Parameters
----------
syslog_df : pd.DataFrame
A dataframe of all syslog events for the host in the time window requried
heartbeat_df : pd.DataFrame
A dataframe of heartbeat data for the host
az_net_df : pd.DataFrame
Option dataframe of Azure network data for the host
Returns
@export
def get_items_from_gzip(binary: bytes) -> Tuple[str, Dict[str, bytes]]:
"""
Return decompressed gzip contents.
Parameters
----------
binary : bytes
byte array of gz file
Returns
-------
Tuple[str, bytes]
File type + decompressed file
"""
archive_file = gzip.decompress(binary)
@export
def replace_query_params(query_name: str, *args, **kwargs) -> str:
"""
Return the parameterized query for query_name.
Parameters
----------
query_name : str
The query to use
Other Parameters
----------------
args : Tuple[QueryParamProvider]
objects that implement QueryParamProvider
(from which query parameters can be extracted).
provs : Iterable[QueryParamProvider]
this should be a collection of objects that
@export
@lru_cache(maxsize=1024)
def crc32_hash(value: str) -> int:
"""
Return the CRC32 hash of the input column.
Parameters
----------
value : str
Data to process
Returns
-------
int
CRC32 hash
"""
@export # noqa: C901, MC0001
def display_timeline_values(
data: pd.DataFrame,
y: str,
time_column: str = "TimeGenerated",
source_columns: list = None,
**kwargs,
) -> figure:
"""
Display a timeline of events.
Parameters
----------
data : pd.DataFrame
DataFrame as a single data set or grouped into individual
plot series using the `group_by` parameter
time_column : str, optional
@export # noqa: C901, MC0001
def plot_cluster(
db_cluster: DBSCAN,
data: pd.DataFrame,
x_predict: np.ndarray,
plot_label: str = None,
plot_features: Tuple[int, int] = (0, 1),
verbose: bool = False,
cut_off: int = 3,
xlabel: str = None,
ylabel: str = None,
):
"""
Plot clustered data as scatter chart.
Parameters
----------