Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def entity_creation(self):
try:
file = './msticpy/tests/testdata/entities.json'
with open(file, 'r') as file_handle:
txt = file_handle.read()
entity_dict = json.loads(txt)
parsed_entities = []
for _, entity in entity_dict.items():
e = Entity.instantiate_entity(entity)
self.assertIsInstance(e, Entity)
if e['Type'] == 'account':
self.assertIsInstance(e, Account)
self.assertTrue('Name' in e)
self.assertGreater(len(e.Name), 0)
elif e['Type'] == 'host':
self.assertIsInstance(e, Host)
self.assertTrue('HostName' in e)
self.assertGreater(len(e.HostName), 0)
elif e['Type'] == 'process':
self.assertIsInstance(e, Process)
self.assertTrue('ProcessId' in e)
self.assertGreater(len(e.ProcessId), 0)
elif e['Type'] == 'file':
self.assertIsInstance(e, File)
self.assertTrue('Name' in e)
self.assertRaises(MsticpyException, lambda: StateMatrix(dict(), UNK_TOKEN))
states = {"haha": {"lol": 1, UNK_TOKEN: 1}, UNK_TOKEN: {"hehe": 1}}
)
self.data2["prior_probs"] = StateMatrix(
{START_TOKEN: 0.3, END_TOKEN: 0.3, UNK_TOKEN: 0.4}, UNK_TOKEN
)
self.data2["trans_probs"] = StateMatrix(
{
START_TOKEN: {
END_TOKEN: 0.6666666666666666,
UNK_TOKEN: 0.3333333333333333,
},
UNK_TOKEN: {END_TOKEN: 0.5, UNK_TOKEN: 0.5},
},
UNK_TOKEN,
)
self.data2["param_probs"] = StateMatrix({UNK_TOKEN: 0.3}, UNK_TOKEN)
self.data2["param_cond_cmd_probs"] = StateMatrix(
{
START_TOKEN: {UNK_TOKEN: 0.3333333333333333},
END_TOKEN: {UNK_TOKEN: 0.3333333333333333},
UNK_TOKEN: {UNK_TOKEN: 0.25},
},
UNK_TOKEN,
)
self.data2["value_probs"] = StateMatrix({UNK_TOKEN: 1}, UNK_TOKEN)
self.data2["value_cond_param_probs"] = StateMatrix(
{UNK_TOKEN: {UNK_TOKEN: 1}}, UNK_TOKEN
)
# populate data3
cmd = "Set-User"
self.data3["sessions"] = [
[
"Set-User": 0.3333333333333333,
"##END##": 0.19047619047619047,
"##UNK##": 0.2857142857142857,
}
self.prior_probs = StateMatrix(states=prior_probs, unk_token=UNK_TOKEN)
trans_probs = {
"##START##": {"Set-User": 0.5, "##END##": 0.25, "##UNK##": 0.25},
"Set-User": {"##END##": 0.5, "Set-User": 0.25, "##UNK##": 0.25},
"##UNK##": {
"Set-User": 0.3333333333333333,
"##END##": 0.3333333333333333,
"##UNK##": 0.3333333333333333,
},
}
self.trans_probs = StateMatrix(states=trans_probs, unk_token=UNK_TOKEN)
def test__init__(self):
self.assertRaises(MsticpyException, lambda: Model(sessions=[]))
self.assertRaises(MsticpyException, lambda: Model(sessions=[[]]))
self.assertRaises(MsticpyException, lambda: Model(sessions=["Set-User"]))
self.assertRaises(MsticpyException, lambda: Model(sessions=[["Set-User"], []]))
self.assertRaises(
Exception, lambda: Model(sessions=[[{"Set-User": {"Identity"}}]])
)
def test_risky_sudo_sessions():
input_file = os.path.join(_TEST_DATA, "sudo_session_test.csv")
sudo_events = pd.read_csv(input_file, parse_dates=["TimeGenerated"])
risky_actions = cl.risky_cmd_line(events=sudo_events, log_type="Syslog")
suspicious_events = cl.cmd_speed(
cmd_events=sudo_events, cmd_field="Command", time=60, events=2
)
sudo_sessions = ls.cluster_syslog_logons_df(logon_events=sudo_events)
output = ls.risky_sudo_sessions(
risky_actions=risky_actions,
suspicious_actions=suspicious_events,
sudo_sessions=sudo_sessions,
)
assert len(output) == 2 # nosec
assert type(output) == dict # nosec
with raises(MsticpyException):
ls.risky_sudo_sessions(sudo_sessions=sudo_sessions)
def test_risky_cmd_line():
input_file = os.path.join(_TEST_DATA, "sudo_data.csv")
input_df = pd.read_csv(input_file)
output = cl.risky_cmd_line(events=input_df, log_type="Syslog")
assert len(output) >= 1 # nosec
assert type(output) == dict # nosec
assert output["2019-07-05T18:19:52.873Z"] == "/bin/bash" # nosec
with raises(MsticpyException):
cl.risky_cmd_line(events=input_df, log_type="Syslog", cmd_field="Test")
def test_cmd_speed():
input_file = os.path.join(_TEST_DATA, "sudo_data_speed.csv")
input_df = pd.read_csv(input_file, parse_dates=["TimeGenerated"])
output = cl.cmd_speed(cmd_events=input_df, cmd_field="Command")
assert len(output) >= 1 # nosec
assert type(output[0]) == dict # nosec
with raises(MsticpyException):
output = cl.cmd_speed(cmd_events=input_df, cmd_field="Test")
def test__init__(self):
self.assertRaises(MsticpyException, lambda: Model(sessions=[]))
self.assertRaises(MsticpyException, lambda: Model(sessions=[[]]))
self.assertRaises(MsticpyException, lambda: Model(sessions=["Set-User"]))
self.assertRaises(MsticpyException, lambda: Model(sessions=[["Set-User"], []]))
self.assertRaises(
Exception, lambda: Model(sessions=[[{"Set-User": {"Identity"}}]])
)
def test_tiseverity(self):
sev_inf = TISeverity.parse("information")
self.assertEqual(sev_inf, TISeverity.information)
sev_warn = TISeverity.parse(1)
self.assertEqual(sev_warn, TISeverity.warning)
sev_warn2 = TISeverity.parse(sev_warn)
self.assertEqual(sev_warn2, TISeverity.warning)
sev_unknown = TISeverity.unknown
sev_high = TISeverity.high
self.assertTrue(sev_inf == TISeverity.information)
self.assertTrue(sev_inf <= "information")
self.assertTrue(sev_inf < 1)
self.assertTrue(sev_warn > TISeverity.information)
self.assertFalse(sev_unknown > "high")