Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def subscribe_tags(self, alertid, montags, toaddr):
self.alertid = alertid
self.montags = montags
self.toaddr = toaddr
try:
monitemlist = montags.split(" ")
self.api.stream.base_url = "https://stream.shodan.io"
if str(alertid) == "all":
self.alertid = None
else:
self.alertid = alertid
for banner in self.api.stream.alert(self.alertid):
for m in monitemlist:
if 'tags' in banner and str(m) in banner['tags']:
ip = str(get_ip(banner))
data = 'Hostname: ' + ip + ' Tag: ' + str(m)
self.log('Alert: ', data)
if (toaddr) is not None:
self.send_mail('[Alert] Tag detected', data, toaddr, None, None)
except APIError as e:
self.log("Error", e.value)
self.log("newline")
sys.exit(1)
except requests.exceptions.ChunkedEncodingError:
self.subscribe_tags(self.alertid, self.montags, self.toaddr)
col += 1
row += 1
total = 0
ports = defaultdict(int)
for banner in iterate_files(files):
try:
# Build the list that contains all the relevant values
data = []
for field in self.fields:
value = self.banner_field(banner, field)
data.append(value)
# Write those values to the main workbook
# Starting off w/ the special "IP" property
main_sheet.write_string(row, 0, get_ip(banner))
col = 1
for value in data:
main_sheet.write(row, col, value)
col += 1
row += 1
except Exception:
pass
# Aggregate summary information
total += 1
ports[banner['port']] += 1
summary_sheet = workbook.add_worksheet('Summary')
summary_sheet.write(0, 0, 'Total', bold)
summary_sheet.write(0, 1, total)
def set_data(self, data):
"""
Set / convert internal data.
For now it just selects a random set to show.
"""
entries = []
# Grab 5 random banners to display
for banner in random.sample(data, min(len(data), 5)):
desc = '{} -> {} / {}'.format(get_ip(banner), banner['port'], banner['location']['country_code'])
if banner['location']['city']:
# Not all cities can be encoded in ASCII so ignore any errors
try:
desc += ' {}'.format(banner['location']['city'])
except:
pass
if 'tags' in banner and banner['tags']:
desc += ' / {}'.format(','.join(banner['tags']))
entry = (
float(banner['location']['latitude']),
float(banner['location']['longitude']),
'*',
desc,
curses.A_BOLD,
click.echo('Saving results to file: {0}'.format(filename))
# Start listening for results
done = False
# Keep listening for results until the scan is done
click.echo('Waiting for data, please stand by...')
while not done:
try:
for banner in api.stream.ports([port], timeout=90):
counter += 1
helpers.write_banner(fout, banner)
if not quiet:
click.echo('{0:<40} {1:<20} {2}'.format(
click.style(helpers.get_ip(banner), fg=COLORIZE_FIELDS['ip_str']),
click.style(str(banner['port']), fg=COLORIZE_FIELDS['port']),
';'.join(banner['hostnames']))
)
except shodan.APIError:
# We stop waiting for results if the scan has been processed by the crawlers and
# there haven't been new results in a while
if done:
break
scan = api.scan_status(scan['id'])
if scan['status'] == 'DONE':
done = True
except socket.timeout:
# We stop waiting for results if the scan has been processed by the crawlers and
# there haven't been new results in a while
if done:
def set_data(self, data):
"""
Set / convert internal data.
For now it just selects a random set to show.
"""
entries = []
# Grab 5 random banners to display
for banner in random.sample(data, min(len(data), 5)):
desc = '{} -> {} / {}'.format(get_ip(banner), banner['port'], banner['location']['country_code'])
if banner['location']['city']:
# Not all cities can be encoded in ASCII so ignore any errors
try:
desc += ' {}'.format(banner['location']['city'])
except Exception:
pass
if 'tags' in banner and banner['tags']:
desc += ' / {}'.format(','.join(banner['tags']))
entry = (
float(banner['location']['latitude']),
float(banner['location']['longitude']),
'*',
desc,
curses.A_BOLD,
def write(self, host):
try:
ip = get_ip(host)
lat, lon = host['location']['latitude'], host['location']['longitude']
feature = """{
"type": "Feature",
"id": "{}",
"properties": {
"name": "{}"
},
"geometry": {
"type": "Point",
"coordinates": [{}, {}]
}
}""".format(ip, ip, lat, lon)
self.fout.write(feature)
except Exception:
# Now wait a few seconds for items to get returned
hosts = collections.defaultdict(dict)
done = False
scan_start = time.time()
cache = {}
while not done:
try:
for banner in api.stream.alert(aid=alert['id'], timeout=wait):
ip = banner.get('ip', banner.get('ipv6', None))
if not ip:
continue
# Don't show duplicate banners
cache_key = '{}:{}'.format(ip, banner['port'])
if cache_key not in cache:
hosts[helpers.get_ip(banner)][banner['port']] = banner
cache[cache_key] = True
# If we've grabbed data for more than 60 seconds it might just be a busy network and we should move on
if time.time() - scan_start >= 60:
scan = api.scan_status(scan['id'])
if verbose:
click.echo('# Scan status: {}'.format(scan['status']))
if scan['status'] == 'DONE':
done = True
break
except shodan.APIError:
# If the connection timed out before the timeout, that means the streaming server
# that the user tried to reach is down. In that case, lets wait briefly and try