Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
api = censys.ipv4.CensysIPv4(api_id=self.api_id, api_secret=self.secret)
self.utility.print_message(OK, 'Check open web ports.')
# Extract search result.
for result in api.search('ip:{}'.format(ip_addr)):
for idx, items in enumerate(result['protocols']):
# Get port number and protocol type.
server_info.append({'Open Port': items.split('/')[0], 'Protocol': items.split('/')[1]})
self.utility.print_message(WARNING, 'Open web port {}: {}'.format(idx+1, items))
if items.split('/')[1] == 'https':
is_https = True
# Check certification.
if is_https is True:
self.utility.print_message(OK, 'Check certification.')
api = censys.certificates.CensysCertificates(api_id=self.api_id, api_secret=self.secret)
fields = ['parsed.subject_dn', 'parsed.validity', 'parsed.signature_algorithm', 'parsed.subject']
# Extract search result.
for cert in api.search('tags: trusted and parsed.names: {}'.format(fqdn), fields=fields):
# Get signature algorithm.
sig_alg = cert['parsed.signature_algorithm.name']
self.utility.print_message(WARNING, 'Signature Algorithm: {}'.format(sig_alg))
# Get common name.
common_names = []
for idx, common_name in enumerate(cert['parsed.subject.common_name']):
common_names.append(common_name)
self.utility.print_message(WARNING, 'Common Name {}: {}'.format(idx+1, common_name))
# Get validity start and end date.
valid_start = cert['parsed.validity.start']
def get_certificates(domain):
if not CENSYS_API_ID or not CENSYS_API_SECRET:
logging.info("\033[1;31m[!] API KEY or Secret for Censys not provided.\033[1;m" \
"\nYou'll have to provide them in the script")
sys.exit()
logging.info("[+] Extracting certificates for {} using Censys".format(domain))
c = censys.certificates.CensysCertificates(CENSYS_API_ID, CENSYS_API_SECRET)
search_results = c.paged_search(domain)
certificates = search_results['results']
if len(certificates) == 0:
print("\033[1;31m[!] No matching certificates found!\033[1;m")
sys.exit()
return certificates
def find_subdomains(domain, api_id, api_secret):
try:
censys_certificates = censys.certificates.CensysCertificates(api_id=api_id, api_secret=api_secret)
certificate_query = 'parsed.names: %s' % domain
certificates_search_results = censys_certificates.search(certificate_query, fields=['parsed.names'])
# Flatten the result, and remove duplicates
subdomains = []
for search_result in certificates_search_results:
subdomains.extend(search_result['parsed.names'])
return set(subdomains)
except censys.base.CensysUnauthorizedException:
sys.stderr.write('[-] Your Censys credentials look invalid.\n')
exit(1)
except censys.base.CensysRateLimitExceededException:
sys.stderr.write('[-] Looks like you exceeded your Censys account limits rate. Exiting\n')
exit(1)
def get_certificates(domain, api_id, api_secret):
try:
censys_certificates = censys.certificates.CensysCertificates(api_id=api_id, api_secret=api_secret)
requested_fields = [
'parsed.names',
'parsed.fingerprint_sha256'
]
certificate_query = 'parsed.names: %s AND tags.raw: trusted AND NOT parsed.names: cloudflaressl.com' % domain
certificates_search_results = censys_certificates.search(certificate_query, fields=requested_fields)
return set([ cert['parsed.fingerprint_sha256'] for cert in certificates_search_results ])
except censys.base.CensysUnauthorizedException:
sys.stderr.write('[-] Your Censys credentials look invalid.\n')
exit(1)
except censys.base.CensysRateLimitExceededException:
sys.stderr.write('[-] Looks like you exceeded your Censys account limits rate. Exiting\n')
exit(1)
def __init__(self, api_id: str = "", api_secret: str = ""):
self.api_id = self.api_secret = ""
if api_id:
self.api_id = api_id
self.api_secret = api_secret
self.ipv4 = ipv4.CensysIPv4(self.api_id, self.api_secret)
self.websites = websites.CensysWebsites(self.api_id, self.api_secret)
self.certificates = certificates.CensysCertificates(self.api_id, self.api_secret)
self.export = export.CensysExport(self.api_id, self.api_secret)
else:
print("[+] %s\t\t[Location: %s] [Ports: %s]" % (
r['ip'],
r['location.country'],
" ".join(r['protocols'])
)
)
else:
try:
ip = api.view(args.IP)
print(json.dumps(ip, sort_keys=True, indent=4, separators=(',', ': ')))
except censys.base.CensysNotFoundException:
print('IP not found')
elif args.subcommand == 'cert':
try:
c = certificates.CensysCertificates(conf['Censys']['id'], conf['Censys']['secret'])
res = c.view(args.ID)
except censys.base.CensysNotFoundException:
print("Certificate not found")
else:
print(json.dumps(res, sort_keys=True, indent=4, separators=(',', ': ')))
elif args.subcommand == 'subdomains':
subdomains = self.get_subdomains(conf, args.DOMAIN, args.verbose)
for d in subdomains:
print(d)
else:
self.parser.print_help()
else:
self.parser.print_help()
def initialize(self, base_dir):
uid = self.censys_struct['uid']
secret = self.censys_struct['secret']
try:
if uid is not None and secret is not None:
self.certificates = censys.certificates.CensysCertificates(uid, secret)
else:
uid = self.conf.items("censys")[0][1]
secret = self.conf.items("censys")[1][1]
if uid != '' and secret != '':
self.certificates = censys.certificates.CensysCertificates(uid, secret)
else:
return False
self.conf.set("censys", "UID", uid)
self.conf.set("censys", "SECRET", secret)
self.conf.write(open(base_dir + "/key.ini", "w"))
except Exception as e:
return False
return True
def __init__(self):
"""Everything that should be initiated with a new object goes here."""
try:
censys_api_id = helpers.config_section_map("Censys")["api_id"]
censys_api_secret = helpers.config_section_map("Censys")["api_secret"]
self.censys_cert_search = censys.certificates.CensysCertificates(api_id=censys_api_id,api_secret=censys_api_secret)
except censys.base.CensysUnauthorizedException:
self.censys_cert_search = None
click.secho("[!] Censys reported your API information is invalid, so Censys searches will be skipped.",fg="yellow")
click.secho("L.. You provided ID %s & Secret %s" % (censys_api_id,censys_api_secret),fg="yellow")
except Exception as error:
self.censys_cert_search = None
click.secho("[!] Did not find a Censys API ID/secret.",fg="yellow")
click.secho("L.. Details: {}".format(error),fg="yellow")
def search_censys(self, target):
print("Searching Censys")
try:
from anubis.API import CENSYS_ID, CENSYS_SECRET
except ImportError:
ColorPrint.red(
"To run a Censys scan, you must add your API keys to anubis/API.py")
return
if not CENSYS_SECRET or not CENSYS_ID:
ColorPrint.red(
"To run a Censys scan, you must add your API keys to anubis/API.py")
return
# Print certificate information for domains
c = censys.certificates.CensysCertificates(CENSYS_ID, CENSYS_SECRET)
for cert in c.search("." + target):
print(cert)
def execute(self):
if not self.api or not self.secret:
return
try:
certs = censys.certificates.CensysCertificates(api_id=self.api, api_secret=self.secret)
resp = certs.search("parsed.names: {}".format(self.target), fields=['parsed.names'])
for line in resp:
for sub in line['parsed.names']:
if sub.endswith(self.target):
self.handler.sub_handler({'Name': sub, 'Source': 'Censys.io'})
except Exception as e:
if str(e).startswith('403 (unauthorized):'):
stdout.write("\033[1;33m[!]\033[1;m \033[1;30mCensys.IO Authentication Failed: verify API Key/Secret\033[1;m\n")
elif '400 (max_results)' in str(e):
pass
else:
stdout.write("\033[1;33m[!]\033[1;m \033[1;30mCensys.IO Error: {}\033[1;m\n".format(str(e)))