Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
else:
return None
@property
def process_pids(self):
"""Returns a list of PIDs associated with this process.
:return: A list of PIDs
:rtype: list of ints
"""
# NOTE(ww): This exists because the API returns the list as "process_pid",
# which is misleading. We just give a slightly clearer name.
return self.process_pid
class Event(UnrefreshableModel):
"""Events can be queried for via ``CbThreatHunterAPI.select``
or though an already selected process with ``Process.events()``.
"""
urlobject = '/threathunter/search/v1/orgs/{}/events/_search'
validation_url = '/threathunter/search/v1/orgs/{}/events/search_validation'
default_sort = 'last_update desc'
primary_key = "process_guid"
@classmethod
def _query_implementation(cls, cb):
return Query(cls, cb)
def __init__(self, cb, model_unique_id=None, initial_data=None, force_init=False, full_doc=True):
super(Event, self).__init__(cb, model_unique_id=model_unique_id, initial_data=initial_data,
force_init=force_init, full_doc=full_doc)
class ReportSeverity(FeedModel):
"""Represents severity information for a watchlist report.
"""
primary_key = "report_id"
swagger_meta_file = "psc/threathunter/models/report_severity.yaml"
def __init__(self, cb, initial_data=None):
if not initial_data:
raise ApiError("ReportSeverity can only be initialized from initial_data")
super(ReportSeverity, self).__init__(cb, model_unique_id=initial_data.get(self.primary_key),
initial_data=initial_data, force_init=False,
full_doc=True)
class Binary(UnrefreshableModel):
"""Represents a retrievable binary.
"""
primary_key = "sha256"
swagger_meta_file = "psc/threathunter/models/binary.yaml"
urlobject_single = "/ubs/v1/orgs/{}/sha256/{}/metadata"
class Summary(UnrefreshableModel):
"""Represents a summary of organization-specific information
for a retrievable binary.
"""
primary_key = "sha256"
urlobject_single = "/ubs/v1/orgs/{}/sha256/{}/summary/device"
def __init__(self, cb, model_unique_id):
if not validators.sha256(model_unique_id):
raise ApiError("model_unique_id must be a valid SHA256")
force_init=False, full_doc=True)
@classmethod
def _query_implementation(cls, cb):
return RunHistoryQuery(cls, cb)
class Result(UnrefreshableModel):
"""
Represents a single result from a LiveQuery ``Run``.
"""
primary_key = "id"
swagger_meta_file = "psc/livequery/models/result.yaml"
urlobject = "/livequery/v1/orgs/{}/runs/{}/results/_search"
class Device(UnrefreshableModel):
"""
Represents device information for a result.
"""
primary_key = "id"
def __init__(self, cb, initial_data):
super(Result.Device, self).__init__(
cb,
model_unique_id=initial_data["id"],
initial_data=initial_data,
force_init=False,
full_doc=True,
)
class Fields(UnrefreshableModel):
"""
"""
Returns the reified ``Result.Metrics`` for this result.
"""
return self._metrics
def query_device_summaries(self):
return self._cb.select(DeviceSummary).run_id(self._run_id)
def query_result_facets(self):
return self._cb.select(ResultFacet).run_id(self._run_id)
def query_device_summary_facets(self):
return self._cb.select(DeviceSummaryFacet).run_id(self._run_id)
class DeviceSummary(UnrefreshableModel):
"""
Represents the summary of results from a single device during a single LiveQuery ``Run``.
"""
primary_key = "id"
swagger_meta_file = "psc/livequery/models/device_summary.yaml"
urlobject = "/livequery/v1/orgs/{}/runs/{}/results/device_summaries/_search"
class Metrics(UnrefreshableModel):
"""
Represents the metrics for a result.
"""
def __init__(self, cb, initial_data):
super(DeviceSummary.Metrics, self).__init__(
cb,
model_unique_id=None,
initial_data=initial_data,
if self.sha256 in downloads.not_found:
return None
elif self.sha256 in downloads.error:
raise InvalidObjectError("{} should be retried".format(self.sha256))
else:
return next((item.url
for item in downloads.found
if self.sha256 == item.sha256), None)
class Downloads(UnrefreshableModel):
"""Represents download information for a list of process hashes.
"""
urlobject = "/ubs/v1/orgs/{}/file/_download"
class FoundItem(UnrefreshableModel):
"""Represents the download URL and process hash for a successfully
located binary.
"""
primary_key = "sha256"
def __init__(self, cb, item):
super(Downloads.FoundItem, self).__init__(cb, model_unique_id=item["sha256"],
initial_data=item, force_init=False,
full_doc=True)
def __init__(self, cb, shas, expiration_seconds=3600):
body = {
"sha256": shas,
"expiration_seconds": expiration_seconds,
}
)
class Fields(UnrefreshableModel):
"""
Represents the fields of a result.
"""
def __init__(self, cb, initial_data):
super(Result.Fields, self).__init__(
cb,
model_unique_id=None,
initial_data=initial_data,
force_init=False,
full_doc=True,
)
class Metrics(UnrefreshableModel):
"""
Represents the metrics for a result.
"""
def __init__(self, cb, initial_data):
super(Result.Metrics, self).__init__(
cb,
model_unique_id=None,
initial_data=initial_data,
force_init=False,
full_doc=True,
)
@classmethod
def _query_implementation(cls, cb):
return ResultQuery(cls, cb)
class FeedModel(UnrefreshableModel, CreatableModelMixin, MutableBaseModel):
"""A common base class for models used by the Feed and Watchlist APIs.
"""
pass
class Process(UnrefreshableModel):
"""Represents a process retrieved by one of the CbTH endpoints.
"""
default_sort = 'last_update desc'
primary_key = "process_guid"
validation_url = "/threathunter/search/v1/orgs/{}/processes/search_validation"
class Summary(UnrefreshableModel):
"""Represents a summary of organization-specific information for
a process.
"""
default_sort = "last_update desc"
primary_key = "process_guid"
urlobject_single = "/threathunter/search/v1/orgs/{}/processes/summary"
def __init__(self, cb, model_unique_id):
url = self.urlobject_single.format(cb.credentials.org_key)
summary = cb.get_object(url, query_parameters={"process_guid": model_unique_id})
while summary["incomplete_results"]:
log.debug("summary incomplete, requesting again")
summary = self._cb.get_object(
url, query_parameters={"process_guid": self.process_guid}
)
"""
urlobject_history = "/livequery/v1/orgs/{}/runs/_search"
def __init__(self, cb, initial_data=None):
item = initial_data
model_unique_id = item.get("id")
super(Run, self).__init__(cb,
model_unique_id, initial_data=item,
force_init=False, full_doc=True)
@classmethod
def _query_implementation(cls, cb):
return RunHistoryQuery(cls, cb)
class Result(UnrefreshableModel):
"""
Represents a single result from a LiveQuery ``Run``.
"""
primary_key = "id"
swagger_meta_file = "psc/livequery/models/result.yaml"
urlobject = "/livequery/v1/orgs/{}/runs/{}/results/_search"
class Device(UnrefreshableModel):
"""
Represents device information for a result.
"""
primary_key = "id"
def __init__(self, cb, initial_data):
super(Result.Device, self).__init__(
cb,
def metrics_(self):
"""
Returns the reified ``DeviceSummary.Metrics`` for this result.
"""
return self._metrics
class ResultFacet(UnrefreshableModel):
"""
Represents the summary of results for a single field in a LiveQuery ``Run``.
"""
primary_key = "field"
swagger_meta_file = "psc/livequery/models/facet.yaml"
urlobject = "/livequery/v1/orgs/{}/runs/{}/results/_facet"
class Values(UnrefreshableModel):
"""
Represents the values associated with a field.
"""
def __init__(self, cb, initial_data):
super(ResultFacet.Values, self).__init__(
cb,
model_unique_id=None,
initial_data=initial_data,
force_init=False,
full_doc=True,
)
@classmethod
def _query_implementation(cls, cb):
return FacetQuery(cls, cb)
class Device(UnrefreshableModel):
"""
Represents device information for a result.
"""
primary_key = "id"
def __init__(self, cb, initial_data):
super(Result.Device, self).__init__(
cb,
model_unique_id=initial_data["id"],
initial_data=initial_data,
force_init=False,
full_doc=True,
)
class Fields(UnrefreshableModel):
"""
Represents the fields of a result.
"""
def __init__(self, cb, initial_data):
super(Result.Fields, self).__init__(
cb,
model_unique_id=None,
initial_data=initial_data,
force_init=False,
full_doc=True,
)
class Metrics(UnrefreshableModel):
"""
Represents the metrics for a result.
"""