Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
list has an entry for each provider result
"""
if not observable and "ioc" in kwargs:
observable = kwargs["ioc"]
if not observable:
raise ValueError("observable or ioc parameter must be supplied.")
result_list: List[Tuple[str, LookupResult]] = []
selected_providers = self._select_providers(providers, prov_scope)
if not selected_providers:
raise RuntimeError(_NO_PROVIDERS_MSSG)
ioc_type = ioc_type if ioc_type else TIProvider.resolve_ioc_type(observable)
for prov_name, provider in selected_providers.items():
provider_result: LookupResult = provider.lookup_ioc(
ioc=observable, ioc_type=ioc_type, query_type=ioc_query_type, **kwargs
)
result_list.append((prov_name, provider_result))
overall_result = any(res.result for _, res in result_list)
return overall_result, result_list
def _parse_multi_results(self, response: LookupResult) -> Iterable[LookupResult]:
"""Parse details of batch response."""
if not isinstance(response.raw_result, dict):
new_result = LookupResult(**attr.asdict(response))
new_result.result = False
new_result.set_severity(TISeverity.information)
new_result.details = "Not found."
yield new_result
elif "response" in response.raw_result:
dom_records = response.raw_result["response"]
for dom_record in dom_records:
result, sev, details = self._parse_one_record(dom_record)
domain_name = dom_record["domain"]
new_result = LookupResult(ioc=domain_name, ioc_type="dns")
new_result.ioc = domain_name
new_result.provider = self._provider_name
new_result.result = result
new_result.set_severity(sev)
new_result.details = details
new_result.raw_result = dom_record
new_result.reference = f"{response.reference}?domains[0]={domain_name}"
yield new_result
def _parse_multi_results(self, response: LookupResult) -> Iterable[LookupResult]:
"""Parse details of batch response."""
if not isinstance(response.raw_result, dict):
new_result = LookupResult(**attr.asdict(response))
new_result.result = False
new_result.set_severity(TISeverity.information)
new_result.details = "Not found."
yield new_result
elif "response" in response.raw_result:
dom_records = response.raw_result["response"]
for dom_record in dom_records:
result, sev, details = self._parse_one_record(dom_record)
domain_name = dom_record["domain"]
new_result = LookupResult(ioc=domain_name, ioc_type="dns")
new_result.ioc = domain_name
new_result.provider = self._provider_name
new_result.result = result
new_result.set_severity(sev)
new_result.details = details
new_result.raw_result = dom_record
ioc : str
IoC observable
ioc_type : str, optional
IoC type, by default None
query_subtype : str, optional
Query sub-type, if any, by default None
Returns
-------
LookupResult
Lookup result with resolved ioc_type and pre-processed
observable.
LookupResult.status is none-zero on failure.
"""
result = LookupResult(
ioc=ioc,
safe_ioc=ioc,
ioc_type=ioc_type if ioc_type else self.resolve_ioc_type(ioc),
query_subtype=query_subtype,
result=False,
details="",
raw_result=None,
reference=None,
)
if not self.is_supported_type(result.ioc_type):
result.details = f"IoC type {result.ioc_type} not supported."
result.status = TILookupStatus.not_supported.value
return result
clean_ioc = preprocess_observable(
bad_requests: List[pd.Series] = []
for ioc, ioc_type in generate_items(data, obs_col, ioc_type_col):
if not ioc:
continue
result = self._check_ioc_type(
ioc=ioc, ioc_type=ioc_type, query_subtype=query_type
)
if result.status == TILookupStatus.ok.value:
domain_list.add(result.ioc)
else:
bad_requests.append(pd.Series(attr.asdict(result)))
results: List[pd.Series] = []
if not domain_list:
return pd.DataFrame(columns=LookupResult.column_map())
for item_result in self._lookup_bulk_request(domain_list): # type: ignore
results.append(pd.Series(attr.asdict(item_result)))
all_results = results + bad_requests
return pd.DataFrame(data=all_results).rename(columns=LookupResult.column_map())
def _lookup_batch(self, ioc_list: list) -> Iterable[LookupResult]:
# build the query string manually - of the form domains[N]=domN&domains[N+1]...
qry_elements = []
for idx, dom in zip(range(0, len(ioc_list)), ioc_list):
qry_elements.append(f"domains[{idx}]={dom}")
qry_str = "&".join(qry_elements)
path = self._IOC_QUERIES["dns"].path
req_url = f"{self._BASE_URL}{path}?{qry_str}"
try:
_, req_params = self._substitute_parms("dummy", "dns", None)
response = self._requests_session.get(
url=req_url, headers=req_params["headers"]
)
result = LookupResult(ioc=",".join(ioc_list), ioc_type="dns")
if response.status_code == 200:
result.status = TILookupStatus.ok.value
result.reference = self._BASE_URL + path
result.raw_result = response.json()
for single_result in self._parse_multi_results(result):
yield single_result
else:
result.raw_result = str(response)
result.result = False
result.reference = req_url
result.status = response.status_code
result.details = "No response from provider."
yield result
except (
LookupError,
JSONDecodeError,
Returns
-------
pd.DataFrame
DataFrame of results.
"""
results = []
for observable, ioc_type in generate_items(data, obs_col, ioc_type_col):
if not observable:
continue
item_result = self.lookup_ioc(
ioc=observable, ioc_type=ioc_type, query_type=query_type
)
results.append(pd.Series(attr.asdict(item_result)))
return pd.DataFrame(data=results).rename(columns=LookupResult.column_map())