Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_query_cbanalyticsalert_invalid_not_blocked_threat_categories():
api = CbPSCBaseAPI(url="https://example.com", token="ABCD/1234",
org_key="Z100", ssl_verify=True)
with pytest.raises(ApiError):
api.select(CBAnalyticsAlert).not_blocked_threat_categories(["MINOR"])
def test_query_basealert_invalid_types():
api = CbPSCBaseAPI(url="https://example.com", token="ABCD/1234",
org_key="Z100", ssl_verify=True)
with pytest.raises(ApiError):
api.select(BaseAlert).types(["ERBOSOFT"])
def test_BaseAlert_undismiss(monkeypatch):
_was_called = False
def _do_update(url, body, **kwargs):
nonlocal _was_called
assert url == "/appservices/v6/orgs/Z100/alerts/ESD14U2C/workflow"
assert body == {"state": "OPEN", "remediation_state": "Fixed", "comment": "NoSir"}
_was_called = True
return StubResponse({"state": "OPEN", "remediation": "Fixed", "comment": "NoSir",
"changed_by": "Robocop", "last_update_time": "2019-10-31T16:03:13.951Z"})
api = CbPSCBaseAPI(url="https://example.com", token="ABCD/1234", org_key="Z100", ssl_verify=True)
patch_cbapi(monkeypatch, api, POST=_do_update)
alert = BaseAlert(api, "ESD14U2C", {"id": "ESD14U2C", "workflow": {"state": "DISMISS"}})
alert.update("Fixed", "NoSir")
assert _was_called
assert alert.workflow_.changed_by == "Robocop"
assert alert.workflow_.state == "OPEN"
assert alert.workflow_.remediation == "Fixed"
assert alert.workflow_.comment == "NoSir"
assert alert.workflow_.last_update_time == "2019-10-31T16:03:13.951Z"
def test_query_device_last_contact_time_start_specified_bad():
api = CbPSCBaseAPI(url="https://example.com", token="ABCD/1234",
org_key="Z100", ssl_verify=True)
with pytest.raises(ApiError):
api.select(Device).last_contact_time(start="2019-09-30T12:34:56", \
range="-3w")
assert t["os"] == [ "LINUX" ]
assert t["policy_id"] == [ 8675309 ]
assert t["status"] == [ "ALL" ]
assert t["target_priority"] == [ "HIGH" ]
t = body.get("exclusions", {})
assert t["sensor_version"] == [ "0.1" ]
t = body.get("sort", [])
t2 = t[0]
assert t2["field"] == "name"
assert t2["order"] == "DESC"
_was_called = True
body = { "id": 6023, "organization_name": "thistestworks" }
envelope = { "results": [ body ], "num_found": 1 }
return MockResponse(envelope)
api = CbPSCBaseAPI(url="https://example.com", token="ABCD/1234",
org_key="Z100", ssl_verify=True)
monkeypatch.setattr(api, "get_object", ConnectionMocks.get("GET"))
monkeypatch.setattr(api, "post_object", mock_post_object)
monkeypatch.setattr(api, "put_object", ConnectionMocks.get("PUT"))
monkeypatch.setattr(api, "delete_object", ConnectionMocks.get("DELETE"))
query = api.select(Device).where("foobar").ad_group_ids([ 14, 25 ]) \
.os([ "LINUX" ]).policy_ids([ 8675309 ]).status([ "ALL" ]) \
.target_priorities(["HIGH"]).exclude_sensor_versions(["0.1"]) \
.sort_by("name", "DESC")
d = query.one()
assert _was_called
assert d.id == 6023
assert d.organization_name == "thistestworks"
def test_query_basealert_invalid_workflows():
api = CbPSCBaseAPI(url="https://example.com", token="ABCD/1234",
org_key="Z100", ssl_verify=True)
with pytest.raises(ApiError):
api.select(BaseAlert).workflows(["IN_LIMBO"])
def test_Device_lr_session(monkeypatch):
def _get_session(url, parms=None, default=None):
assert url == "/appservices/v6/orgs/Z100/devices/6023"
return {"id": 6023}
api = CbPSCBaseAPI(url="https://example.com", token="ABCD/1234", org_key="Z100", ssl_verify=True)
sked = StubScheduler(6023)
api._lr_scheduler = sked
patch_cbapi(monkeypatch, api, GET=_get_session)
dev = Device(api, 6023, {"id": 6023})
sess = dev.lr_session()
assert sess["itworks"]
assert sked.was_called
assert t["policy_applied"] == ["APPLIED"]
assert t["reason_code"] == ["ATTACK_VECTOR"]
assert t["run_state"] == ["RAN"]
assert t["sensor_action"] == ["DENY"]
assert t["threat_cause_vector"] == ["WEB"]
t = body["sort"]
t2 = t[0]
assert t2["field"] == "name"
assert t2["order"] == "DESC"
_was_called = True
body = {"id": "S0L0", "org_key": "Z100", "threat_id": "B0RG", "workflow": {"state": "OPEN"}}
envelope = { "results": [ body ], "num_found": 1 }
return MockResponse(envelope)
api = CbPSCBaseAPI(url="https://example.com", token="ABCD/1234",
org_key="Z100", ssl_verify=True)
monkeypatch.setattr(api, "get_object", ConnectionMocks.get("GET"))
monkeypatch.setattr(api, "post_object", mock_post_object)
monkeypatch.setattr(api, "put_object", ConnectionMocks.get("PUT"))
monkeypatch.setattr(api, "delete_object", ConnectionMocks.get("DELETE"))
query = api.select(CBAnalyticsAlert).where("Blort").categories(["SERIOUS", "CRITICAL"]).device_ids([6023]) \
.device_names(["HAL"]).device_os(["LINUX"]).device_os_versions(["0.1.2"]).device_username(["JRN"]) \
.group_results(True).alert_ids(["S0L0"]).legacy_alert_ids(["S0L0_1"]).minimum_severity(6) \
.policy_ids([8675309]).policy_names(["Strict"]).process_names(["IEXPLORE.EXE"]) \
.process_sha256(["0123456789ABCDEF0123456789ABCDEF"]).reputations(["SUSPECT_MALWARE"]) \
.tags(["Frood"]).target_priorities(["HIGH"]).threat_ids(["B0RG"]).types(["WATCHLIST"]) \
.workflows(["OPEN"]).blocked_threat_categories(["RISKY_PROGRAM"]).device_locations(["ONSITE"]) \
.kill_chain_statuses(["EXECUTE_GOAL"]).not_blocked_threat_categories(["NEW_MALWARE"]) \
.policy_applied(["APPLIED"]).reason_code(["ATTACK_VECTOR"]).run_states(["RAN"]) \
.sensor_actions(["DENY"]).threat_cause_vectors(["WEB"]).sort_by("name", "DESC")
a = query.one()
_was_called = False
def mock_post_object(url, body, **kwargs):
nonlocal _was_called
assert url == "/appservices/v6/orgs/Z100/alerts/_search"
assert body["query"] == "Blort"
t = body["criteria"]
t2 = t.get("create_time", {})
assert t2["start"] == "2019-09-30T12:34:56"
assert t2["end"] == "2019-10-01T12:00:12"
_was_called = True
body = {"id": "S0L0", "org_key": "Z100", "threat_id": "B0RG", "workflow": {"state": "OPEN"}}
envelope = { "results": [ body ], "num_found": 1 }
return MockResponse(envelope)
api = CbPSCBaseAPI(url="https://example.com", token="ABCD/1234",
org_key="Z100", ssl_verify=True)
monkeypatch.setattr(api, "get_object", ConnectionMocks.get("GET"))
monkeypatch.setattr(api, "post_object", mock_post_object)
monkeypatch.setattr(api, "put_object", ConnectionMocks.get("PUT"))
monkeypatch.setattr(api, "delete_object", ConnectionMocks.get("DELETE"))
query = api.select(BaseAlert).where("Blort") \
.create_time(start="2019-09-30T12:34:56", end="2019-10-01T12:00:12")
a = query.one()
assert _was_called
assert a.id == "S0L0"
assert a.org_key == "Z100"
assert a.threat_id == "B0RG"
assert a.workflow_.state == "OPEN"
def test_Device_uninstall_sensor(monkeypatch):
_was_called = False
def _get_device(url, parms=None, default=None):
assert url == "/appservices/v6/orgs/Z100/devices/6023"
return {"id": 6023}
def _uninstall_sensor(url, body, **kwargs):
nonlocal _was_called
assert url == "/appservices/v6/orgs/Z100/device_actions"
assert body == {"action_type": "UNINSTALL_SENSOR", "device_id": [6023]}
_was_called = True
return StubResponse(None, 204)
api = CbPSCBaseAPI(url="https://example.com", token="ABCD/1234", org_key="Z100", ssl_verify=True)
patch_cbapi(monkeypatch, api, GET=_get_device, POST=_uninstall_sensor)
dev = Device(api, 6023, {"id": 6023})
dev.uninstall_sensor()
assert _was_called