Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
provided_ioc = provided_ioc.replace("[D]", ".")
if provided_ioc in empty_files:
splunk_table.append({"invalid": provided_ioc})
continue
if validators.url(provided_ioc) or validators.domain(provided_ioc) or \
validators.ipv4(provided_ioc):
analysis_dicts = get_analysis(provided_ioc)
if isinstance(analysis_dicts, dict) or analysis_dicts == None:
splunk_table.append({"invalid": provided_ioc})
continue
ioc_dicts = get_payloads(analysis_dicts)
elif validators.md5(provided_ioc) or validators.sha256(provided_ioc):
ioc_dicts = get_urls(provided_ioc)
else:
splunk_table.append({"invalid": provided_ioc})
continue
for ioc_dict in ioc_dicts:
ioc_dict = lower_keys(ioc_dict)
splunk_table.append(ioc_dict)
return splunk_table
def checkType(self, argument):
"""
Identify observable type
"""
if not argument or len(argument.strip()) == 0:
return None
elif argument[0] is '#':
return None
elif validators.url(argument):
return "URL"
elif validators.md5(argument):
return "MD5"
elif validators.sha1(argument):
return "SHA1"
elif validators.sha256(argument):
return "SHA256"
elif validators.sha512(argument):
return "SHA512"
elif validators.ipv4(argument):
return "IPv4"
elif validators.ipv6(argument):
return "IPv6"
elif validators.domain(argument):
return "domain"
else:
return None
splunk_table.append(rate_limit)
return splunk_table
empty_files = ["d41d8cd98f00b204e9800998ecf8427e",
"e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"]
splunk_table = []
for provided_ioc in set(provided_iocs):
provided_ioc = commons.deobfuscate_string(provided_ioc)
if provided_ioc in empty_files:
splunk_table.append({"invalid": provided_ioc})
continue
if validators.url(provided_ioc) or validators.domain(provided_ioc) or \
validators.ipv4(provided_ioc) or validators.md5(provided_ioc) or \
validators.sha256(provided_ioc) or \
len(provided_ioc) > 2 and len(provided_ioc) <= 140:
ioc_dicts = query_twitter(session, provided_ioc)
else:
splunk_table.append({"invalid": provided_ioc})
continue
for ioc_dict in ioc_dicts:
ioc_dict = commons.lower_keys(ioc_dict)
splunk_table.append(ioc_dict)
return splunk_table
if results != None:
provided_iocs = [y for x in results for y in x.values()]
else:
provided_iocs = sys.argv[1:]
session = commons.create_session()
splunk_table = []
for provided_ioc in provided_iocs:
provided_ioc = commons.deobfuscate_string(provided_ioc)
if validators.ipv4(provided_ioc):
ioc_type = "ip"
elif validators.domain(provided_ioc):
ioc_type = "domain"
elif validators.md5(provided_ioc):
ioc_type = "md5"
elif validators.sha256(provided_ioc):
ioc_type = "sha256"
else:
splunk_table.append({"invalid": provided_ioc})
continue
ioc_dicts = query_cymon(ioc_type, session, provided_ioc)
if isinstance(ioc_dicts, dict):
splunk_table.append(ioc_dicts)
continue
for ioc_dict in ioc_dicts:
ioc_dict = commons.lower_keys(ioc_dict)
splunk_table.append(ioc_dict)
def read_iocs(cb, file=sys.stdin):
iocs = defaultdict(list)
report_id = hashlib.md5()
report_id.update(str(time.time()).encode("utf-8"))
for idx, line in enumerate(sys.stdin):
line = line.rstrip("\r\n")
report_id.update(line.encode("utf-8"))
if validators.md5(line):
iocs["md5"].append(line)
elif validators.sha256(line):
eprint("line {}: sha256 provided but not yet supported by backend".format(idx + 1))
iocs["sha256"].append(line)
elif validators.ipv4(line):
iocs["ipv4"].append(line)
elif validators.ipv6(line):
iocs["ipv6"].append(line)
elif validators.domain(line):
iocs["dns"].append(line)
else:
if cb.validate_query(line):
query_ioc = {"search_query": line}
iocs["query"].append(query_ioc)
else:
eprint("line {}: invalid query".format(idx + 1))