Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
click.echo('Total number of results:\t%s' % total)
click.echo('Query credits left:\t\t%s' % info['unlocked_left'])
click.echo('Output file:\t\t\t%s' % filename)
if limit > total:
limit = total
# A limit of -1 means that we should download all the data
if limit <= 0:
limit = total
# Adjust the total number of results we should expect to download if the user is skipping results
if skip > 0:
limit -= skip
with helpers.open_file(filename, 'w') as fout:
count = 0
try:
cursor = api.search_cursor(query, minify=False, skip=skip)
with click.progressbar(cursor, length=limit) as bar:
for banner in bar:
helpers.write_banner(fout, banner)
count += 1
if count >= limit:
break
except Exception:
pass
# Let the user know we're done
if count < limit:
click.echo(click.style('Notice: fewer results were saved than requested', 'yellow'))
# Return immediately
if wait <= 0:
click.echo('Exiting now, not waiting for results. Use the API or website to retrieve the results of the scan.')
else:
# Setup an alert to wait for responses
alert = api.create_alert('Scan: {}'.format(', '.join(netblocks)), netblocks)
# Create the output file if necessary
filename = filename.strip()
fout = None
if filename != '':
# Add the appropriate extension if it's not there atm
if not filename.endswith('.json.gz'):
filename += '.json.gz'
fout = helpers.open_file(filename, 'w')
# Start a spinner
finished_event = threading.Event()
progress_bar_thread = threading.Thread(target=async_spinner, args=(finished_event,))
progress_bar_thread.start()
# Now wait a few seconds for items to get returned
hosts = collections.defaultdict(dict)
done = False
scan_start = time.time()
cache = {}
while not done:
try:
for banner in api.stream.alert(aid=alert['id'], timeout=wait):
ip = banner.get('ip', banner.get('ipv6', None))
if not ip:
host = api.host(ip, history=history)
# Print the host information to the terminal using the user-specified format
HOST_PRINT[format](host, history=history)
# Store the results
if filename or save:
if save:
filename = '{}.json.gz'.format(ip)
# Add the appropriate extension if it's not there atm
if not filename.endswith('.json.gz'):
filename += '.json.gz'
# Create/ append to the file
fout = helpers.open_file(filename)
for banner in sorted(host['data'], key=lambda k: k['port']):
if 'placeholder' not in banner:
helpers.write_banner(fout, banner)
except shodan.APIError as e:
raise click.ClickException(e.value)