How to use the cbapi.psc.livequery.rest_api.CbLiveQueryAPI function in cbapi

To help you get started, we’ve selected a few cbapi examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github carbonblack / cbapi-python / test / cbapi / psc / livequery / test_rest_api.py View on Github external
def test_query(monkeypatch):
    _was_called = False

    def mock_post_object(url, body, **kwargs):
        nonlocal _was_called
        assert url == "/livequery/v1/orgs/Z100/runs"
        assert body["sql"] == "select * from whatever;"
        _was_called = True
        return MockResponse({"org_key": "Z100", "name": "FoobieBletch", "id": "abcdefg"})

    api = CbLiveQueryAPI(url="https://example.com", token="ABCD/1234",
                         org_key="Z100", ssl_verify=True)
    monkeypatch.setattr(api, "get_object", ConnectionMocks.get("GET"))
    monkeypatch.setattr(api, "post_object", mock_post_object)
    monkeypatch.setattr(api, "put_object", ConnectionMocks.get("PUT"))
    monkeypatch.setattr(api, "delete_object", ConnectionMocks.get("DELETE"))
    query = api.query("select * from whatever;")
    assert isinstance(query, RunQuery)
    run = query.submit()
    assert _was_called
    assert run.org_key == "Z100"
    assert run.name == "FoobieBletch"
    assert run.id == "abcdefg"
github carbonblack / cbapi-python / test / cbapi / psc / livequery / test_rest_api.py View on Github external
_was_called = False

    def mock_post_object(url, body, **kwargs):
        nonlocal _was_called
        assert url == "/livequery/v1/orgs/Z100/runs"
        assert body["sql"] == "select * from whatever;"
        assert body["name"] == "AmyWasHere"
        assert body["notify_on_finish"]
        df = body["device_filter"]
        assert df["device_ids"] == [1, 2, 3]
        assert df["device_types"] == ["Alpha", "Bravo", "Charlie"]
        assert df["policy_ids"] == [16, 27, 38]
        _was_called = True
        return MockResponse({"org_key": "Z100", "name": "FoobieBletch", "id": "abcdefg"})

    api = CbLiveQueryAPI(url="https://example.com", token="ABCD/1234",
                         org_key="Z100", ssl_verify=True)
    monkeypatch.setattr(api, "get_object", ConnectionMocks.get("GET"))
    monkeypatch.setattr(api, "post_object", mock_post_object)
    monkeypatch.setattr(api, "put_object", ConnectionMocks.get("PUT"))
    monkeypatch.setattr(api, "delete_object", ConnectionMocks.get("DELETE"))
    query = api.query("select * from whatever;").device_ids([1, 2, 3])
    query = query.device_types(["Alpha", "Bravo", "Charlie"])
    query = query.policy_ids([16, 27, 38])
    query = query.name("AmyWasHere").notify_on_finish()
    assert isinstance(query, RunQuery)
    run = query.submit()
    assert _was_called
    assert run.org_key == "Z100"
    assert run.name == "FoobieBletch"
    assert run.id == "abcdefg"
github carbonblack / cbapi-python / test / cbapi / psc / livequery / test_rest_api.py View on Github external
def test_query_history(monkeypatch):
    _was_called = False

    def mock_post_object(url, body, **kwargs):
        nonlocal _was_called
        assert url == "/livequery/v1/orgs/Z100/runs/_search"
        assert body["query"] == "xyzzy"
        _was_called = True
        run1 = {"org_key": "Z100", "name": "FoobieBletch", "id": "abcdefg"}
        run2 = {"org_key": "Z100", "name": "Aoxomoxoa", "id": "cdefghi"}
        run3 = {"org_key": "Z100", "name": "Read_Me", "id": "efghijk"}
        return MockResponse({"org_key": "Z100", "num_found": 3, "results": [run1, run2, run3]})

    api = CbLiveQueryAPI(url="https://example.com", token="ABCD/1234",
                         org_key="Z100", ssl_verify=True)
    monkeypatch.setattr(api, "get_object", ConnectionMocks.get("GET"))
    monkeypatch.setattr(api, "post_object", mock_post_object)
    monkeypatch.setattr(api, "put_object", ConnectionMocks.get("PUT"))
    monkeypatch.setattr(api, "delete_object", ConnectionMocks.get("DELETE"))
    query = api.query_history("xyzzy")
    assert isinstance(query, RunHistoryQuery)
    count = 0
    for item in query.all():
        assert item.org_key == "Z100"
        if item.id == "abcdefg":
            assert item.name == "FoobieBletch"
        elif item.id == "cdefghi":
            assert item.name == "Aoxomoxoa"
        elif item.id == "efghijk":
            assert item.name == "Read_Me"
github carbonblack / cbapi-python / test / cbapi / psc / livequery / test_rest_api.py View on Github external
_was_called = False

    def mock_post_object(url, body, **kwargs):
        nonlocal _was_called
        assert url == "/livequery/v1/orgs/Z100/runs/_search"
        assert body["query"] == "xyzzy"
        t = body["sort"][0]
        assert t["field"] == "id"
        assert t["order"] == "ASC"
        _was_called = True
        run1 = {"org_key": "Z100", "name": "FoobieBletch", "id": "abcdefg"}
        run2 = {"org_key": "Z100", "name": "Aoxomoxoa", "id": "cdefghi"}
        run3 = {"org_key": "Z100", "name": "Read_Me", "id": "efghijk"}
        return MockResponse({"org_key": "Z100", "num_found": 3, "results": [run1, run2, run3]})

    api = CbLiveQueryAPI(url="https://example.com", token="ABCD/1234",
                         org_key="Z100", ssl_verify=True)
    monkeypatch.setattr(api, "get_object", ConnectionMocks.get("GET"))
    monkeypatch.setattr(api, "post_object", mock_post_object)
    monkeypatch.setattr(api, "put_object", ConnectionMocks.get("PUT"))
    monkeypatch.setattr(api, "delete_object", ConnectionMocks.get("DELETE"))
    query = api.query_history("xyzzy").sort_by("id")
    assert isinstance(query, RunHistoryQuery)
    count = 0
    for item in query.all():
        assert item.org_key == "Z100"
        if item.id == "abcdefg":
            assert item.name == "FoobieBletch"
        elif item.id == "cdefghi":
            assert item.name == "Aoxomoxoa"
        elif item.id == "efghijk":
            assert item.name == "Read_Me"
github carbonblack / cbapi-python / test / cbapi / psc / livequery / test_rest_api.py View on Github external
def test_query_policy_ids_broken():
    api = CbLiveQueryAPI(url="https://example.com", token="ABCD/1234",
                         org_key="Z100", ssl_verify=True)
    query = api.query("select * from whatever;")
    with pytest.raises(ApiError):
        query = query.policy_ids(["Bogus"])
github carbonblack / cbapi-python / test / cbapi / psc / livequery / test_models.py View on Github external
def test_run_stop_failed(monkeypatch):
    _was_called = False

    def mock_put_object(url, body, **kwargs):
        nonlocal _was_called
        assert url == "/livequery/v1/orgs/Z100/runs/abcdefg/status"
        assert body["status"] == "CANCELLED"
        _was_called = True
        return MockResponse({"error_message": "The query is not presently running."}, 409)

    api = CbLiveQueryAPI(url="https://example.com", token="ABCD/1234",
                         org_key="Z100", ssl_verify=True)
    run = Run(api, "abcdefg", {"org_key": "Z100", "name": "FoobieBletch",
                               "id": "abcdefg", "status": "CANCELLED"})
    monkeypatch.setattr(api, "get_object", ConnectionMocks.get("GET"))
    monkeypatch.setattr(api, "post_object", ConnectionMocks.get("POST"))
    monkeypatch.setattr(api, "put_object", mock_put_object)
    monkeypatch.setattr(api, "delete_object", ConnectionMocks.get("DELETE"))
    rc = run.stop()
    assert _was_called
    assert not rc
github carbonblack / cbapi-python / test / cbapi / psc / livequery / test_models.py View on Github external
def test_run_stop(monkeypatch):
    _was_called = False

    def mock_put_object(url, body, **kwargs):
        nonlocal _was_called
        assert url == "/livequery/v1/orgs/Z100/runs/abcdefg/status"
        assert body["status"] == "CANCELLED"
        _was_called = True
        return MockResponse({"org_key": "Z100", "name": "FoobieBletch",
                             "id": "abcdefg", "status": "CANCELLED"})

    api = CbLiveQueryAPI(url="https://example.com", token="ABCD/1234",
                         org_key="Z100", ssl_verify=True)
    run = Run(api, "abcdefg", {"org_key": "Z100", "name": "FoobieBletch",
                               "id": "abcdefg", "status": "ACTIVE"})
    monkeypatch.setattr(api, "get_object", ConnectionMocks.get("GET"))
    monkeypatch.setattr(api, "post_object", ConnectionMocks.get("POST"))
    monkeypatch.setattr(api, "put_object", mock_put_object)
    monkeypatch.setattr(api, "delete_object", ConnectionMocks.get("DELETE"))
    rc = run.stop()
    assert _was_called
    assert rc
    assert run.org_key == "Z100"
    assert run.name == "FoobieBletch"
    assert run.id == "abcdefg"
    assert run.status == "CANCELLED"
github carbonblack / cbapi-python / test / cbapi / psc / livequery / test_models.py View on Github external
assert t["device_name"] == ["AxCx", "A7X"]
        t = body["terms"]
        assert t["fields"] == ["alpha", "bravo", "charlie"]
        _was_called = True
        v1 = {"total": 1, "id": "alpha1", "name": "alpha1"}
        v2 = {"total": 2, "id": "alpha2", "name": "alpha2"}
        term1 = {"field": "alpha", "values": [v1, v2]}
        v1 = {"total": 1, "id": "bravo1", "name": "bravo1"}
        v2 = {"total": 2, "id": "bravo2", "name": "bravo2"}
        term2 = {"field": "bravo", "values": [v1, v2]}
        v1 = {"total": 1, "id": "charlie1", "name": "charlie1"}
        v2 = {"total": 2, "id": "charlie2", "name": "charlie2"}
        term3 = {"field": "charlie", "values": [v1, v2]}
        return MockResponse({"terms": [term1, term2, term3]})

    api = CbLiveQueryAPI(url="https://example.com", token="ABCD/1234",
                         org_key="Z100", ssl_verify=True)
    tmp_id = {"id": "abcdefg"}
    result = Result(api, {"id": "abcdefg", "device": tmp_id, "fields": {}, "metrics": {}})
    monkeypatch.setattr(api, "get_object", ConnectionMocks.get("GET"))
    monkeypatch.setattr(api, "post_object", mock_post_object)
    monkeypatch.setattr(api, "put_object", ConnectionMocks.get("PUT"))
    monkeypatch.setattr(api, "delete_object", ConnectionMocks.get("DELETE"))
    query = result.query_device_summary_facets().where("xyzzy")
    query = query.facet_field("alpha").facet_field(["bravo", "charlie"])
    query = query.criteria(device_name=["AxCx", "A7X"])
    assert isinstance(query, FacetQuery)
    count = 0
    for item in query.all():
        vals = item.values
        if item.field == "alpha":
            assert vals[0]["id"] == "alpha1"
github carbonblack / cbapi-python / test / cbapi / psc / livequery / test_models.py View on Github external
def test_run_refresh(monkeypatch):
    _was_called = False

    def mock_get_object(url, parms=None, default=None):
        nonlocal _was_called
        assert url == "/livequery/v1/orgs/Z100/runs/abcdefg"
        assert parms is None
        assert default is None
        _was_called = True
        return {"org_key": "Z100", "name": "FoobieBletch",
                "id": "abcdefg", "status": "COMPLETE"}

    api = CbLiveQueryAPI(url="https://example.com", token="ABCD/1234",
                         org_key="Z100", ssl_verify=True)
    run = Run(api, "abcdefg", {"org_key": "Z100", "name": "FoobieBletch",
                               "id": "abcdefg", "status": "ACTIVE"})
    monkeypatch.setattr(api, "get_object", mock_get_object)
    monkeypatch.setattr(api, "post_object", ConnectionMocks.get("POST"))
    monkeypatch.setattr(api, "put_object", ConnectionMocks.get("PUT"))
    monkeypatch.setattr(api, "delete_object", ConnectionMocks.get("DELETE"))
    rc = run.refresh()
    assert _was_called
    assert rc
    assert run.org_key == "Z100"
    assert run.name == "FoobieBletch"
    assert run.id == "abcdefg"
    assert run.status == "COMPLETE"
github carbonblack / cbapi-python / test / cbapi / psc / livequery / test_rest_api.py View on Github external
def test_query_device_ids_broken():
    api = CbLiveQueryAPI(url="https://example.com", token="ABCD/1234",
                         org_key="Z100", ssl_verify=True)
    query = api.query("select * from whatever;")
    with pytest.raises(ApiError):
        query = query.device_ids(["Bogus"])