Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _import_extras(nm_spc: Dict[str, Any], extra_imports: List[str]):
for imp_spec in extra_imports:
params: List[Optional[str]] = [None, None, None]
for idx, param in enumerate(imp_spec.split(",")):
params[idx] = param.strip() or None
if params[0] is None:
raise MsticpyException(
f"First parameter in extra_imports is mandatory: {imp_spec}"
)
_imp_from_package(nm_spc=nm_spc, pkg=params[0], tgt=params[1], alias=params[2])
session = self.sessions[0]
cmd = session[0]
if isinstance(cmd, str):
self.session_type = SessionType.cmds_only
elif self._check_cmd_type():
if isinstance(cmd.params, set):
self.session_type = SessionType.cmds_params_only
elif isinstance(cmd.params, dict):
self.session_type = SessionType.cmds_params_values
else:
raise MsticpyException(
"Params attribute of Cmd data structure should "
+ "be either a set or a dict"
)
else:
raise MsticpyException(
"Each element of 'sessions' should be a list of either "
+ "strings, or Cmd data types"
)
Parameters
----------
window_len: int
length of sliding window for likelihood calculations
use_start_end_tokens: bool
if True, then `start_token` and `end_token` will be prepended
and appended to each
session respectively before the calculations are done
use_geo_mean: bool
if True, then each of the likelihoods of the sliding windows
will be raised to the power
of (1/`window_len`)
"""
if self.prior_probs is None:
raise MsticpyException(
"please train the model first before using this method"
)
if self.session_type == SessionType.cmds_only:
rare_tuples = [
cmds_only.rarest_window_session(
session=ses,
prior_probs=self.prior_probs,
trans_probs=self.trans_probs,
window_len=window_len,
use_start_end_tokens=use_start_end_tokens,
start_token=self.start_token,
end_token=self.end_token,
use_geo_mean=use_geo_mean,
)
for ses in self.sessions
speed of execution against
(Defaults to 10)
Returns
-------
risky suspicious_actions: list
A list of commands that match a risky pattern
Raises
------
AttributeError
If cmd_field is not in supplied data set or TimeGenerated note datetime format
"""
if cmd_field not in cmd_events.columns:
raise MsticpyException(f"Dataframe does not contain {cmd_field} column")
if isinstance(cmd_events["TimeGenerated"].iloc[0], dt.datetime) is False:
raise MsticpyException("TimeGenerated is not a datetime format")
suspicious_actions = []
cmd_events[cmd_field].replace("", np.nan, inplace=True)
# Only focus on logs that contain comand line activity
actions = cmd_events.dropna(subset=[cmd_field]).reset_index()
df_len = len(actions.index) - (events + 1)
while df_len >= 0:
delta = (
actions["TimeGenerated"][(df_len + events)]
- actions["TimeGenerated"][df_len]
)
if delta < dt.timedelta(seconds=time):
suspicious_actions.append(
use_end_token: bool
if set to True, the end_token will be appended to the window
before the likelihood calculation is done
start_token: str
dummy command to signify the start of the session (e.g. "##START##")
end_token: str
dummy command to signify the end of the session (e.g. "##END##")
Returns
-------
likelihood of the window
"""
if use_start_token:
if start_token is None:
raise MsticpyException(
"start_token should not be None, when use_start_token is True"
)
if use_end_token:
if end_token is None:
raise MsticpyException(
"end_token should not be None, when use_end_token is True"
)
w_len = len(window)
if w_len == 0:
return np.nan
prob = 1
cur = window[0]
if use_start_token:
modellable_params: set, optional
set of params which you deem to have categorical values which are suitable
for modelling.
Note this argument will only have an effect if your sessions include commands,
params and values. If your sessions include commands, params and values and
this argument is not set, then some rough heuristics will be used to determine
which params have values which are suitable for modelling.
"""
if not isinstance(sessions, list):
raise MsticpyException("`sessions` should be a list")
if len(sessions) == 0:
raise MsticpyException("`sessions` should not be an empty list")
for i, ses in enumerate(sessions):
if not isinstance(ses, list):
raise MsticpyException("each session in `sessions` should be a list")
if len(ses) == 0:
raise MsticpyException(
"session at index {} of `sessions` is empty. Each session "
"should contain at least one command".format(i)
)
self.start_token = "##START##"
self.end_token = "##END##"
self.unk_token = "##UNK##"
self.sessions = sessions
self.session_type = None
self._asses_input()
# non laplace smoothed counts
self._seq1_counts = None
def _compute_probs_params(self):
"""Compute the individual param probs and param conditional on command probs."""
if self.param_counts is None:
raise MsticpyException("param_counts attribute should not be None")
if self.cmd_param_counts is None:
raise MsticpyException("cmd_param_counts attribute should not be None")
param_probs, param_cond_cmd_probs = probabilities.compute_params_probs(
param_counts=self.param_counts,
cmd_param_counts=self.cmd_param_counts,
seq1_counts=self.seq1_counts,
unk_token=self.unk_token,
)
self.param_probs = param_probs
self.param_cond_cmd_probs = param_cond_cmd_probs
def _compute_probs_values(self):
"""Compute the individual value probs and value conditional on param probs."""
if self.value_counts is None:
raise MsticpyException("value_counts attribute should not be None")
if self.param_value_counts is None:
raise MsticpyException("param_value_counts attribute should not be None")
value_probs, value_cond_param_probs = probabilities.compute_values_probs(
value_counts=self.value_counts,
param_value_counts=self.param_value_counts,
unk_token=self.unk_token,
)
self.value_probs = value_probs
self.value_cond_param_probs = value_cond_param_probs
def _compute_probs_cmds(self):
"""Compute the individual and transition command probabilties."""
if self.seq1_counts is None:
raise MsticpyException("seq1_counts attribute should not be None")
if self.seq2_counts is None:
raise MsticpyException("seq2_counts attribute should not be None")
prior_probs, trans_probs = probabilities.compute_cmds_probs(
seq1_counts=self.seq1_counts,
seq2_counts=self.seq2_counts,
unk_token=self.unk_token,
)
self.prior_probs = prior_probs
self.trans_probs = trans_probs