Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def setup_class(self):
self.patcher = patch('passivetotal.api.Client._get', fake_request)
self.patcher.start()
self.client = EnrichmentRequest('--No-User--', '--No-Key--')
for sample in results[domain]["results"]:
print("%s|%s|%s|%s|%s" % (
domain,
sample["collectionDate"],
sample["sample"],
sample["source"],
sample["sourceUrl"]
)
)
else:
self.parser.print_help()
elif args.subcommand == "osint":
# FIXME: add research of projects
client = EnrichmentRequest(conf["PassiveTotal"]["username"], conf["PassiveTotal"]['key'])
if args.domain:
raw_results = client.get_osint(query=args.domain)
print(json.dumps(raw_results, sort_keys=True, indent=4, separators=(',', ': ')))
elif args.file:
with open(args.file, 'r') as infile:
data = infile.read().split()
domain_list = list(set([a.strip() for a in data]))
if len(domain_list) < 51:
raw_results = client.get_bulk_osint(query=domain_list)
if "results" not in raw_results or not raw_results["success"]:
print("Request failed")
print(json.dumps(raw_results, sort_keys=True, indent=4, separators=(',', ': ')))
sys.exit(1)
else:
results = raw_results["results"]
else:
elif args.email:
raw_results = client.search_whois_by_field(
query=args.email.strip(),
field="email"
)
print(json.dumps(raw_results, sort_keys=True, indent=4, separators=(',', ': ')))
else:
self.parser.print_help()
elif args.subcommand == "dns":
client = DnsRequest(conf['PassiveTotal']['username'], conf['PassiveTotal']['key'])
raw_results = client.get_passive_dns(
query=unbracket(args.DOMAIN),
)
print(json.dumps(raw_results, sort_keys=True, indent=4, separators=(',', ': ')))
elif args.subcommand == "malware":
client = EnrichmentRequest(conf["PassiveTotal"]["username"], conf["PassiveTotal"]['key'])
if args.domain:
raw_results = client.get_malware(query=args.domain)
print(json.dumps(raw_results, sort_keys=True, indent=4, separators=(',', ': ')))
elif args.file:
with open(args.file, 'r') as infile:
data = infile.read().split()
domain_list = list(set([a.strip() for a in data]))
if len(domain_list) < 51:
raw_results = client.get_bulk_malware(query=domain_list)
if "results" not in raw_results or not raw_results["success"]:
print("Request failed")
print(json.dumps(raw_results, sort_keys=True, indent=4, separators=(',', ': ')))
sys.exit(1)
else:
results = raw_results["results"]
else:
def __init__(self, *args, **kwargs):
"""Setup the primary client instance."""
super(EnrichmentRequest, self).__init__(*args, **kwargs)
client = DnsRequest(conf['PassiveTotal']['username'], conf['PassiveTotal']['key'])
raw_results = client.get_passive_dns(query=unbracket(args.DOMAIN))
if "results" in raw_results:
for res in raw_results["results"]:
passive_dns.append({
"first": parse(res["firstSeen"]).astimezone(pytz.utc),
"last": parse(res["lastSeen"]).astimezone(pytz.utc),
"ip": res["resolve"],
"source": "PT"
})
if "message" in raw_results:
if "quota_exceeded" in raw_results["message"]:
print("PT quota exceeded")
ptout = True
if not ptout:
client2 = EnrichmentRequest(conf["PassiveTotal"]["username"], conf["PassiveTotal"]['key'])
# Get OSINT
# TODO: add PT projects here
pt_osint = client2.get_osint(query=unbracket(args.DOMAIN))
# Get malware
raw_results = client2.get_malware(query=unbracket(args.DOMAIN))
if "results" in raw_results:
for r in raw_results["results"]:
malware.append({
'hash': r["sample"],
'date': parse(r['collectionDate']),
'source' : 'PT (%s)' % r["source"]
})
except requests.exceptions.ReadTimeout:
print("PT: Time Out")
# VT
vt_e = plugins['vt'].test_config(conf)
passive_dns.append({
"first": parse(res["firstSeen"]).astimezone(pytz.utc),
"last": parse(res["lastSeen"]).astimezone(pytz.utc),
"domain": res["resolve"],
"source": "PT"
})
if "message" in raw_results:
if "quota_exceeded" in raw_results["message"]:
print("Quota exceeded for Passive Total")
out_pt = True
pt_osint = {}
except requests.exceptions.ReadTimeout:
print("Timeout on Passive Total requests")
if not out_pt:
try:
client2 = EnrichmentRequest(conf["PassiveTotal"]["username"], conf["PassiveTotal"]['key'])
# Get OSINT
# TODO: add PT projects here
pt_osint = client2.get_osint(query=unbracket(args.IP))
# Get malware
raw_results = client2.get_malware(query=unbracket(args.IP))
if "results" in raw_results:
for r in raw_results["results"]:
malware.append({
'hash': r["sample"],
'date': parse(r['collectionDate']),
'source' : 'PT (%s)' % r["source"]
})
except requests.exceptions.ReadTimeout:
print("Timeout on Passive Total requests")
# VT
vt_e = plugins['vt'].test_config(conf)