Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
r['ip'],
r['location.country'],
" ".join(r['protocols'])
)
)
else:
try:
ip = api.view(args.IP)
print(json.dumps(ip, sort_keys=True, indent=4, separators=(',', ': ')))
except censys.base.CensysNotFoundException:
print('IP not found')
elif args.subcommand == 'cert':
try:
c = certificates.CensysCertificates(conf['Censys']['id'], conf['Censys']['secret'])
res = c.view(args.ID)
except censys.base.CensysNotFoundException:
print("Certificate not found")
else:
print(json.dumps(res, sort_keys=True, indent=4, separators=(',', ': ')))
elif args.subcommand == 'subdomains':
subdomains = self.get_subdomains(conf, args.DOMAIN, args.verbose)
for d in subdomains:
print(d)
else:
self.parser.print_help()
else:
self.parser.print_help()
"""
This function is used to search hosts in Censys with initialized API
:param query: query that you want to use for you search
:param max_records: quantity of hosts that you want to get with query
:return: None
"""
try:
self.results = list(
self.api.search(
query, fields=self.search_fields, max_records=max_records
)
)
except (
CensysRateLimitExceededException,
CensysJSONDecodeException,
CensysNotFoundException,
CensysUnauthorizedException,
) as api_error:
print(f"Censys API error: {api_error}")
except AttributeError as api_not_defined:
print(f"Censys API was not initialized: {api_not_defined}")
except CensysException as too_much_results_required:
if "Only the first 1,000 search results are available" in str(
too_much_results_required
):
print(
"Only the first 1,000 search results are available. Retry search with 1,000 results limit."
)
self.search(
query, max_records=DefaultValues.CENSYS_FREE_PLAN_RESULTS_QUANTITY
)
else:
new_ports += 1
except:
print("Failed to add port to port docs:\n", traceback.format_exc())
time.sleep(.5)
except KeyboardInterrupt:
raise
except CensysRateLimitExceededException:
# TODO: Need to wait for more API calls
print("YOU STILL NEED TO IMPLEMENT BETTER API WAITING")
print("Sleeping for 3 minutes to wait for API permission...\n")
time.sleep(180)
# print(traceback.format_exc())
except CensysNotFoundException:
no_result += 1
except:
print(traceback.format_exc())
print("{} Completed | {} New Ports | {} No Results".format(completed, new_ports, no_result))
class CensysUnauthorizedException(CensysException):
pass
class CensysJSONDecodeException(CensysException):
pass
class CensysAPIBase(object):
DEFAULT_URL = "https://www.censys.io/api/v1"
DEFAULT_TIMEOUT = 30
EXCEPTIONS = {
403: CensysUnauthorizedException,
404: CensysNotFoundException,
429: CensysRateLimitExceededException
}
def __init__(self, api_id=None, api_secret=None, url=None, timeout=None):
self.api_id = api_id or os.environ.get("CENSYS_API_ID", None)
self.api_secret = api_secret or os.environ.get("CENSYS_API_SECRET", None)
if not self.api_id or not self.api_secret:
raise CensysException(401, "No API ID or API secret configured.")
timeout = timeout or self.DEFAULT_TIMEOUT
self._api_url = url or os.environ.get("CENSYS_API_URL", None) or self.DEFAULT_URL
# create a session that we'll use for making requests
self._session = requests.Session()
self._session.auth = (self.api_id, self.api_secret)
self._session.timeout = timeout
self._session.headers.update({"accept": "text/json, application/json, */8"})
# test that everything works by requesting the users account information