Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_certificates():
try:
if not CENSYS_API_ID or not CENSYS_API_SECRET:
logging.info("\033[1;31m[!] API KEY or Secret for Censys not provided.\033[1;m" \
"\nYou'll have to provide them in the script")
sys.exit()
logging.info("[+] Extracting certificates for {} using Censys".format(domain))
censys_certificates = censys.certificates.CensysCertificates(CENSYS_API_ID, CENSYS_API_SECRET)
return censys_certificates
except censys.base.CensysUnauthorizedException:
logging.info('[!] Your Censys credentials look invalid.\n')
exit(1)
except censys.base.CensysRateLimitExceededException:
logging.info('[!] Looks like you exceeded your Censys account limits rate. Exiting\n')
exit(1)
def find_subdomains(domain, api_id, api_secret):
try:
censys_certificates = censys.certificates.CensysCertificates(api_id=api_id, api_secret=api_secret)
certificate_query = 'parsed.names: %s' % domain
certificates_search_results = censys_certificates.search(certificate_query, fields=['parsed.names'])
# Flatten the result, and remove duplicates
subdomains = []
for search_result in certificates_search_results:
subdomains.extend(search_result['parsed.names'])
return set(subdomains)
except censys.base.CensysUnauthorizedException:
sys.stderr.write('[-] Your Censys credentials look invalid.\n')
exit(1)
except censys.base.CensysRateLimitExceededException:
sys.stderr.write('[-] Looks like you exceeded your Censys account limits rate. Exiting\n')
exit(1)
def get_certificates(domain, api_id, api_secret):
try:
censys_certificates = censys.certificates.CensysCertificates(api_id=api_id, api_secret=api_secret)
requested_fields = [
'parsed.names',
'parsed.fingerprint_sha256'
]
certificate_query = 'parsed.names: %s AND tags.raw: trusted AND NOT parsed.names: cloudflaressl.com' % domain
certificates_search_results = censys_certificates.search(certificate_query, fields=requested_fields)
return set([ cert['parsed.fingerprint_sha256'] for cert in certificates_search_results ])
except censys.base.CensysUnauthorizedException:
sys.stderr.write('[-] Your Censys credentials look invalid.\n')
exit(1)
except censys.base.CensysRateLimitExceededException:
sys.stderr.write('[-] Looks like you exceeded your Censys account limits rate. Exiting\n')
exit(1)
def get_certificates():
try:
if not CENSYS_API_ID or not CENSYS_API_SECRET:
logging.info("\033[1;31m[!] API KEY or Secret for Censys not provided.\033[1;m" \
"\nYou'll have to provide them in the script")
sys.exit()
logging.info("[+] Extracting certificates using Censys")
censys_certificates = censys.certificates.CensysCertificates(CENSYS_API_ID, CENSYS_API_SECRET)
return censys_certificates
except censys.base.CensysUnauthorizedException:
logging.info('\033[93m[!] Your Censys credentials look invalid.\n\033[1;m')
sys.exit(1)
except censys.base.CensysRateLimitExceededException:
logging.info('\033[93m[!] Looks like you exceeded your Censys account limits rate. Exiting\n\033[1;m')
sys.exit(1)
def CensysSearch(value, api_id, api_secret):
try:
censys_certificates = censys.certificates.CensysCertificates(api_id=api_id, api_secret=api_secret)
certificate_query = 'parsed.names: %s' % value
certificates_search_results = censys_certificates.search(certificate_query, fields=['parsed.names'])
subdomains = []
for search_result in certificates_search_results:
subdomains.extend(search_result['parsed.names'])
return set(subdomains)
except censys.base.CensysUnauthorizedException:
sys.stderr.write('[-] Your Censys credentials look invalid.\n')
exit(1)
except censys.base.CensysRateLimitExceededException:
sys.stderr.write('[-] Looks like you exceeded your Censys account limits rate. Exiting\n')
exit(1)
# Add ports to port list
for port in protocols:
try:
port = int(port.split("/")[0])
newport = elastic_bounty_tools.add_port(ip['key'], port, source, args.workspace)
if newport:
new_ports += 1
except:
print("Failed to add port to port docs:\n", traceback.format_exc())
time.sleep(.5)
except KeyboardInterrupt:
raise
except CensysRateLimitExceededException:
# TODO: Need to wait for more API calls
print("YOU STILL NEED TO IMPLEMENT BETTER API WAITING")
print("Sleeping for 3 minutes to wait for API permission...\n")
time.sleep(180)
# print(traceback.format_exc())
except CensysNotFoundException:
no_result += 1
except:
print(traceback.format_exc())
print("{} Completed | {} New Ports | {} No Results".format(completed, new_ports, no_result))
Parameters
target The domain name, e.g. apple.com, to be looked-up with on Censys.
"""
if self.censys_cert_search is None:
pass
else:
try:
# Use the `parsed.names` filter to avoid unwanted domains
query = "parsed.names: %s" % target
results = self.censys_cert_search.search(query,fields=['parsed.names',
'parsed.signature_algorithm.name','parsed.signature.self_signed',
'parsed.validity.start','parsed.validity.end','parsed.fingerprint_sha256',
'parsed.subject_dn','parsed.issuer_dn'])
return results
except censys.base.CensysRateLimitExceededException:
click.secho("\n[!] Censys reports your account has run out of API credits.",fg="red")
return None
except Exception as error:
click.secho("\n[!] Error collecting Censys certificate data for {}.".format(target),fg="red")
click.secho("L.. Details: {}".format(error),fg="red")
return None
A Censys API key is required.
"""
if self.censys_cert_search is None:
pass
else:
try:
# click.secho("[+] Performing Censys certificate search for {}".format(target), fg="green")
query = "parsed.names: %s" % target
results = self.censys_cert_search.search(query, fields=['parsed.names',
'parsed.signature_algorithm.name','parsed.signature.self_signed',
'parsed.validity.start','parsed.validity.end','parsed.fingerprint_sha256',
'parsed.subject_dn','parsed.issuer_dn'])
return results
except censys.base.CensysRateLimitExceededException:
click.secho("\n[!] Censys reports your account has run out of API credits.", fg="red")
return None
except Exception as error:
click.secho("\n[!] Error collecting Censys certificate data for {}.".format(target), fg="red")
click.secho("L.. Details: {}".format(error), fg="red")
return None
max_records: int = DefaultValues.CENSYS_DEFAULT_RESULTS_QUANTITY
) -> None:
"""
This function is used to search hosts in Censys with initialized API
:param query: query that you want to use for you search
:param max_records: quantity of hosts that you want to get with query
:return: None
"""
try:
self.results = list(
self.api.search(
query, fields=self.search_fields, max_records=max_records
)
)
except (
CensysRateLimitExceededException,
CensysJSONDecodeException,
CensysNotFoundException,
CensysUnauthorizedException,
) as api_error:
print(f"Censys API error: {api_error}")
except AttributeError as api_not_defined:
print(f"Censys API was not initialized: {api_not_defined}")
except CensysException as too_much_results_required:
if "Only the first 1,000 search results are available" in str(
too_much_results_required
):
print(
"Only the first 1,000 search results are available. Retry search with 1,000 results limit."
)
self.search(
query, max_records=DefaultValues.CENSYS_FREE_PLAN_RESULTS_QUANTITY
pass
class CensysJSONDecodeException(CensysException):
pass
class CensysAPIBase(object):
DEFAULT_URL = "https://www.censys.io/api/v1"
DEFAULT_TIMEOUT = 30
EXCEPTIONS = {
403: CensysUnauthorizedException,
404: CensysNotFoundException,
429: CensysRateLimitExceededException
}
def __init__(self, api_id=None, api_secret=None, url=None, timeout=None):
self.api_id = api_id or os.environ.get("CENSYS_API_ID", None)
self.api_secret = api_secret or os.environ.get("CENSYS_API_SECRET", None)
if not self.api_id or not self.api_secret:
raise CensysException(401, "No API ID or API secret configured.")
timeout = timeout or self.DEFAULT_TIMEOUT
self._api_url = url or os.environ.get("CENSYS_API_URL", None) or self.DEFAULT_URL
# create a session that we'll use for making requests
self._session = requests.Session()
self._session.auth = (self.api_id, self.api_secret)
self._session.timeout = timeout
self._session.headers.update({"accept": "text/json, application/json, */8"})
# test that everything works by requesting the users account information
self.account()