Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_api(api_key):
try:
print('\n---------------------------------\n'+Color.green('Connecting To Shodan API...')+'\n---------------------------------')
api = shodan.Shodan(api_key)
sleep(1)
api.info()
print(Color.green('Created New Api Instance!')+'\n---------------------------------\n\n\n')
return(api)
except shodan.exception.APIError as e:
print(Color.red('Error:')+' %s\n---------------------------------\n\n' % e)
exit(0)
def run(self, conf, args, plugins):
if 'subcommand' in args:
if 'Shodan' not in conf and 'key' not in conf['Shodan']:
print('Bad configuration for Shodan, quitting...')
sys.exit(1)
api = shodan.Shodan(conf['Shodan']['key'])
if args.subcommand == 'ip':
try:
res = api.host(args.IP, history=args.history)
except shodan.exception.APIError:
print("IP not found in Shodan")
else:
if args.verbose:
print(json.dumps(res, sort_keys=True, indent=4))
else:
if args.summary:
for d in res['data']:
if d['port'] == 22:
print("%s - port 22 ssh - %s" % (
d['timestamp'][:19],
d['data'].split("\n")[0]
)
)
elif d['port'] == 80:
print("%s - port 80 http - Server \"%s\"" % (
d['timestamp'][:19],
servicesports = []
for result in results['data']:
try:
for key in result['http']['components'].keys():
technologies.append(key)
except KeyError:
pass
port = str(result.get('port'))
product = str(result.get('product'))
servicesports.append(str(product)+':'+str(port))
technologies = list(set(technologies))
self.hostdatarow = [
str(results.get('ip_str')), str(results.get('hostnames')).strip('[]\''),
str(results.get('org')), str(servicesports).replace('\'', '').strip('[]'),
str(technologies).replace('\'', '').strip('[]')]
except exception.APIError:
print(f'{ipaddress}: Not in Shodan')
self.hostdatarow = [ipaddress, "Not in Shodan", "Not in Shodan", "Not in Shodan", "Not in Shodan"]
except Exception as e:
print(f'Error occurred in the Shodan IP search module: {e}')
finally:
return self.hostdatarow
def host_vulners_scan(api, ip):
"""
Initiate vulnerabilities scan with shodan api host method
:param api: shodan api instance
:param ip: host ip (str)
:return: list of host vulnerabilities (list)
"""
time.sleep(REQUEST_DELAY_SLEEP_TIME)
try:
host_data = api.host(ip)
except shodan.exception.APIError as rate_limit_err:
print(
"{color}Request limit error (vulnerabilities): {error_info}{reset}".format(
error_info=rate_limit_err,
color=ERROR_COLOR,
reset=RESET_COLOR))
time.sleep(REQUEST_LIMIT_SLEEP_TIME)
return host_vulners_scan(api, ip)
except Exception as unknown_error:
print(
"{color}Error: {error_info}{reset}".format(
error_info=unknown_error, color=ERROR_COLOR,
reset=RESET_COLOR))
return
snmp_vulner = snmp_checker(host_data, ip)
def setup_platform(hass, config, add_entities, discovery_info=None):
"""Set up the Shodan sensor."""
import shodan
api_key = config.get(CONF_API_KEY)
name = config.get(CONF_NAME)
query = config.get(CONF_QUERY)
data = ShodanData(shodan.Shodan(api_key), query)
try:
data.update()
except shodan.exception.APIError as error:
_LOGGER.warning("Unable to connect to Shodan.io: %s", error)
return False
add_entities([ShodanSensor(data, name)], True)
def gather(self, all_ips):
for path, incoming_ip_obj in all_ips.iteritems():
if incoming_ip_obj[0].shodan_info == "" and incoming_ip_obj[0].ip_address != "":
if self.api_key is "":
print helpers.color("[*] Error: You didn't provide a Shodan API Key!", warning=True)
print helpers.color("[*] Please edit Shodan module and add in your API Key.", warning=True)
else:
if incoming_ip_obj[0].shodan_info is '':
print "Querying Shodan for information about " + incoming_ip_obj[0].ip_address
try:
json_result = self.api_object.host(incoming_ip_obj[0].ip_address)
incoming_ip_obj[0].shodan_info = json_result
except shodan.exception.APIError:
incoming_ip_obj[0].shodan_info = "No available information within Shodan about " + incoming_ip_obj[0].ip_address
except simplejson.decoder.JSONDecodeError:
pass
return
for host in results['matches']:
rec_count += 1
try:
for hostname in host['hostnames']:
self.insert_ports(host=hostname, ip_address=host['ip_str'], port=host['port'],
protocol=host['transport'])
self.insert_hosts(host=hostname, ip_address=host['ip_str'])
except KeyError:
self.insert_ports(ip_address=ipaddr, port=host['port'], protocol=host['transport'])
self.insert_host(ip_address=host['ip_str'])
page += 1
time.sleep(limit)
except shodan.exception.APIError:
pass
"lng": result_field["location"]["longitude"],
"country": location["country_name"],
"vulnerabilities": host_vulners,
"additional_info": parsed_additional_info
})
# Calculate vulnerabilities quantity after new query
if vuln_scan:
print(
"{color}[!] vulnerabilities found: {count}{reset}".format(
color=ADD_VULNERABILITIES_COLOR,
count=(len(all_vulners) - prev_vuln_counter),
reset=RESET_COLOR
))
except shodan.exception.APIError as rate_limit_err:
print("{color}Request limit error: {error_info}{reset}".format(
color=ERROR_COLOR,
error_info=rate_limit_err,
reset=RESET_COLOR))
# Default timer = 30 sec.
time.sleep(REQUEST_LIMIT_SLEEP_TIME)
except Exception as unknown_error:
print("{color}Error: {error_info}{reset}".format(
color=ERROR_COLOR,
error_info=unknown_error,
reset=RESET_COLOR))
if not result:
break
print("{color}Final result (unique hosts): {result_count}{reset}".format(
color=ADD_VULNERABILITIES_COLOR,
#Shodan repeat IP entries for each port Open on an IP, the below code is for that
if previous_ip==result['ip_str']:
continue
else:
previous_ip=result['ip_str']
#print "Reached here"
#print result
temp = {}
temp["Query"] = term
time.sleep(1)
#Fetch details of each of IP one by one
try:
host = api.host('%s' %result['ip'])
except Shodan_exception.APIError, e:
#No results found, print no 'matches'
print "No "+result['ip_str']+' %s\r' %e
continue
ip = '%s' %host.get('ip_str', None)
#IP Stored as string
temp["IP"] = ip.encode('ascii', 'replace')
#Hostname also as string
hostnames = s = ''.join(host.get('hostnames', None))
temp["Hostnames"] = hostnames.encode('ascii', 'replace')
#String as array of ports
ports = '%s' %host.get('ports', None)
while rec_count <= total_results:
results = api.search(query, page=page)
total_results = results['total']
for host in results['matches']:
rec_count += 1
if len(host['hostnames']) > 0:
for hostname in host['hostnames']:
self.insert_pushpins(*prep_host(host, hostname))
else:
self.insert_pushpins(*prep_host(host, 'None'))
page += 1
time.sleep(limit)
except shodan.exception.APIError:
pass