Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_tiseverity(self):
sev_inf = TISeverity.parse("information")
self.assertEqual(sev_inf, TISeverity.information)
sev_warn = TISeverity.parse(1)
self.assertEqual(sev_warn, TISeverity.warning)
sev_warn2 = TISeverity.parse(sev_warn)
self.assertEqual(sev_warn2, TISeverity.warning)
sev_unknown = TISeverity.unknown
sev_high = TISeverity.high
self.assertTrue(sev_inf == TISeverity.information)
self.assertTrue(sev_inf <= "information")
self.assertTrue(sev_inf < 1)
self.assertTrue(sev_warn > TISeverity.information)
self.assertFalse(sev_unknown > "high")
Parameters
----------
response : LookupResult
The returned data response
Returns
-------
Tuple[bool, TISeverity, Any]
bool = positive or negative hit
TISeverity = enumeration of severity
Object with match details
"""
if self._failed_response(response) or not isinstance(response.raw_result, dict):
return False, TISeverity.information, "Not found."
if "pulse_info" in response.raw_result:
pulses = response.raw_result["pulse_info"].get("pulses", {})
pulse_count = len(pulses)
if pulse_count == 0:
severity = TISeverity.information
return (
True,
severity,
{
"pulse_count": pulse_count,
"sections_available": response.raw_result["sections"],
},
)
if pulse_count == 1:
severity = TISeverity.warning
else:
{
"rank": dom_record.get("rank", "0"),
"page_rank": dom_record.get("page_rank_decimal", 0),
"error": dom_record.get("error", ""),
},
)
if record_status == 404:
return (
True,
TISeverity.warning,
{
"rank": dom_record.get("rank", "0"),
"error": dom_record.get("error", ""),
},
)
return False, TISeverity.information, {}
if d_frame["Action"].str.lower().isin(["alert", "block"]).any():
severity = TISeverity.high
return (
True,
severity,
{
"Action": self._series_to_list(d_frame["Action"]),
"ThreatType": self._series_to_list(d_frame["ThreatType"]),
"ThreatSeverity": self._series_to_list(d_frame["ThreatSeverity"]),
"Active": self._series_to_list(d_frame["Active"]),
"Description": self._series_to_list(d_frame["Description"]),
"ConfidenceScore": self._series_to_list(d_frame["ConfidenceScore"]),
},
)
return False, TISeverity.information, "No data"
Return the details of the response.
Parameters
----------
response : LookupResult
The returned data response
Returns
-------
Tuple[bool, TISeverity, Any]
bool = positive or negative hit
TISeverity = enumeration of severity
Object with match details
"""
severity = TISeverity.information
if self._failed_response(response) or not isinstance(response.raw_result, dict):
return False, severity, "Not found."
result = True
result_dict = {}
if (
response.ioc_type in ["ipv4", "ipv6", "url", "dns"]
and not response.query_subtype
):
score = response.raw_result.get("score", 0)
result_dict.update(
{
"score": response.raw_result.get("score", 0),
"cats": response.raw_result.get("cats"),
"categoryDescriptions": response.raw_result.get(
"categoryDescriptions"
),
Returns
-------
Tuple[bool, TISeverity, Any]
bool = positive or negative hit
TISeverity = enumeration of severity
Object with match details
"""
if self._failed_response(response) or not isinstance(response.raw_result, dict):
return False, TISeverity.information, "Not found."
if "pulse_info" in response.raw_result:
pulses = response.raw_result["pulse_info"].get("pulses", {})
pulse_count = len(pulses)
if pulse_count == 0:
severity = TISeverity.information
return (
True,
severity,
{
"pulse_count": pulse_count,
"sections_available": response.raw_result["sections"],
},
)
if pulse_count == 1:
severity = TISeverity.warning
else:
severity = TISeverity.high
return (
True,
severity,
{
)
if verb == "GET":
response = self._requests_session.get(**req_params)
else:
raise NotImplementedError(f"Unsupported verb {verb}")
result.status = response.status_code
result.reference = req_params["url"]
if result.status == 200:
try:
result.raw_result = response.json()
result.result, severity, result.details = self.parse_results(result)
except JSONDecodeError:
result.raw_result = f"""There was a problem parsing results from this lookup:
{response.text}"""
result.result = False
severity = TISeverity.information
result.details = {}
result.set_severity(severity)
result.status = TILookupStatus.ok.value
else:
result.raw_result = str(response)
result.result = False
result.details = self._response_message(result.status)
return result
except ( # pylint: disable=duplicate-code
LookupError,
JSONDecodeError,
NotImplementedError,
ConnectionError,
) as err:
self._err_to_results(result, err)
if not isinstance(err, LookupError):
)
else:
print("Unknown response from provider: " + str(data_result))
src_ioc_frame = pd.DataFrame(obs_set, columns=["Ioc"])
src_ioc_frame["IocType"] = ioc_type
src_ioc_frame["QuerySubtype"] = query_type
src_ioc_frame["Reference"] = query_obj("query", **query_params)
# If no results, add the empty dataframe to the combined results
# and continue
if not isinstance(data_result, pd.DataFrame):
src_ioc_frame["Result"] = False
src_ioc_frame["Details"] = "Query failure"
src_ioc_frame["Status"] = TILookupStatus.query_failed.value
src_ioc_frame["Severity"] = TISeverity.information.value
all_results.append(src_ioc_frame)
continue
if data_result.empty:
src_ioc_frame["Result"] = False
src_ioc_frame["Details"] = "Not found."
src_ioc_frame["Status"] = TILookupStatus.ok.value
src_ioc_frame["Severity"] = TISeverity.information.value
all_results.append(src_ioc_frame)
continue
# Create our results columns
data_result["Result"] = True
data_result["Status"] = TILookupStatus.ok.value
data_result["Severity"] = self._get_severity(data_result)
data_result["Details"] = self._get_detail_summary(data_result)
data_result["RawResult"] = data_result.apply(lambda x: x.to_dict(), axis=1)
Parameters
----------
response : LookupResult
The returned data response
Returns
-------
Tuple[bool, TISeverity, Any]
bool = positive or negative hit
TISeverity = enumeration of severity
Object with match details
"""
if self._failed_response(response) or not isinstance(response.raw_result, dict):
return False, TISeverity.information, "Not found."
severity = TISeverity.information
if "response" in response.raw_result:
dom_records = response.raw_result["response"]
dom_record = dom_records[0]
return self._parse_one_record(dom_record)
return True, severity, {}
def _parse_multi_results(self, response: LookupResult) -> Iterable[LookupResult]:
"""Parse details of batch response."""
if not isinstance(response.raw_result, dict):
new_result = LookupResult(**attr.asdict(response))
new_result.result = False
new_result.set_severity(TISeverity.information)
new_result.details = "Not found."
yield new_result
elif "response" in response.raw_result:
dom_records = response.raw_result["response"]
for dom_record in dom_records:
result, sev, details = self._parse_one_record(dom_record)
domain_name = dom_record["domain"]
new_result = LookupResult(ioc=domain_name, ioc_type="dns")
new_result.ioc = domain_name
new_result.provider = self._provider_name
new_result.result = result
new_result.set_severity(sev)
new_result.details = details
new_result.raw_result = dom_record
new_result.reference = f"{response.reference}?domains[0]={domain_name}"
yield new_result