Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def is_ip_address(text):
if validators.ip_address.ipv4(text):
return True
return validators.ip_address.ipv6(text)
def shodan_search(target, api_key):
print ("\n-> Shodan Results")
api = shodan.Shodan(api_key)
print ('\n[@] Target: ' + target + '\n')
if validators.ip_address.ipv4(target):
host = api.host(target)
print ("""
[*] City: {}
[*] Country: {}
[*] Postal Code: {}
[*] Longitude: {}
[*] Latitude: {}
[*] Operation System: {}
[*] Organization: {}
[*] ISP: {}""".format(host['city'], host['country_name'], host['postal_code'], host.get('longitude', 'N/A'), host.get('latitude', 'N/A'), host['os'], host['org'], host['isp']))
if len(host['ports']) >= 1:
for port in host['ports']:
print ('[*] Port: ' + str(port))
def format_message_for_slack(response, indicator):
results = response.get("results", [])
if not results:
return f"No results for indicator {indicator}", None
attachments = []
for result in results:
indicator = result['value']
resolve = result['resolve']
if (url_validator(indicator) or domain_validator(indicator)) and not ipv4_validator(indicator):
indicator = defang(indicator, all_dots=True)
if (url_validator(resolve) or domain_validator(resolve)) and not ipv4_validator(resolve):
resolve = defang(resolve, all_dots=True)
attachments.append({
"color": "#36a64f",
"author_name": f"PassiveTotal Results for {result['value']}",
"author_link": f"https://community.riskiq.com/search/{result['value']}",
"author_icon": "https://cdn.riskiq.com/wp-content/themes/riskiq/media/gradient-logo.png",
"title": f"Query Results for {result['resolve']}",
"title_link": f"https://community.riskiq.com/search/{result['resolve']}",
"fields": [
{
"title": "Resolve",
"value": f"{resolve}",
"short": False
},
{
def format_message_for_slack(response, indicator):
results = response.get("results", [])
if not results:
return f"No results for indicator {indicator}", None
attachments = []
for result in results:
indicator = result['value']
resolve = result['resolve']
if (url_validator(indicator) or domain_validator(indicator)) and not ipv4_validator(indicator):
indicator = defang(indicator, all_dots=True)
if (url_validator(resolve) or domain_validator(resolve)) and not ipv4_validator(resolve):
resolve = defang(resolve, all_dots=True)
attachments.append({
"color": "#36a64f",
"author_name": f"PassiveTotal Results for {result['value']}",
"author_link": f"https://community.riskiq.com/search/{result['value']}",
"author_icon": "https://cdn.riskiq.com/wp-content/themes/riskiq/media/gradient-logo.png",
"title": f"Query Results for {result['resolve']}",
"title_link": f"https://community.riskiq.com/search/{result['resolve']}",
"fields": [
{
"title": "Resolve",
"value": f"{resolve}",
"short": False
}
}
try:
validate(instance=data[i], schema=prop_schema)
except jsonschema.exceptions.ValidationError:
return {'message': f'Something went wrong, the supplied input doesn\'t seem to be valid in [`ip_props`][{int(i)-1}]'}, 500
"""
Check if supplied record type matches ip
So 127.0.0.1 can't be CNAME
And google.com can't be answer for A :D
"""
record_funcs = {
"CNAME": checkDomain,
"A": ipv4,
"AAAA": ipv6
}
if not record_funcs[data[i]['type']](data[i]['ip']):
return {'message': f"data[{int(i)-1}]['ip'] has to be in {data[{int(i)-1}]['type']} format"}, 500
"""
Then put the data together
Generate new uuid4
Put it in database and redis
Then return the whole domain
"""
# rbnd_json does not need name parameter - it's meant to be stored in redis and in props column in database
rbnd_json = {
'ip_props': data,
def CheckDomainOrIP(value):
if not validators.domain(value) and not validators.ip_address.ipv4(value):
raise argparse.ArgumentTypeError('Invalid domain or ip address ({}).'.format(value))
return value