Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@celery.task(base=WebDriverTask, soft_time_limit=300, time_limit=600)
def get_shodan_result(query, page=1):
logger.info("Fetching shodan results query: %s page: %s", query, page)
api = shodan.WebAPI(API_KEY)
try:
res = api.search(query, page=page)
except shodan.api.WebAPIError:
logger.info('Finished shodan results with %s page(s).', page - 1)
else:
if res:
get_shodan_result.apply_async(args=[query],
kwargs={'page': page + 1},
queue='get_shodan_result')
for r in res.get('matches', []):
get_screenshot.apply_async(args=[r], queue='get_screenshot')
return res
@celery.task(soft_time_limit=300, time_limit=600)
def get_shodan_results(page=1):
logger.info("Fetching shodan results page: %s", page)
api = shodan.WebAPI(API_KEY)
try:
res = api.search(QUERY, page=page)
except shodan.api.WebAPIError:
logger.info('Finished shodan results with %s page(s).', page -1)
else:
get_shodan_results.delay(page=page+1)
for r in res.get('matches', []):
get_screenshot.delay(r)
return res
@celery.task
def get_shodan_results(page=1):
print "getting results: %s" % page
api = shodan.WebAPI(API_KEY)
try:
res = api.search(QUERY, page=page)
except shodan.api.WebAPIError:
logger.info('Finished shodan results with %s page(s).', page -1)
else:
print group(get_screenshot.s(x) for x in res['matches']).apply_async()
get_shodan_results.delay(page=page+1)
return res