Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
#!/usr/bin/env python3
import censys.query
import configparser
import censysobject
config = configparser.ConfigParser()
config.read("config.ini")
CENSYS_API_ID = (config['SectionOne']['CENSYS_API_ID'])
CENSYS_API_KEY = (config['SectionOne']['CENSYS_API_KEY'])
nrOfResults = 0
c = censys.export.CensysExport(api_id=CENSYS_API_ID, api_secret=CENSYS_API_KEY)
censys_object = censysobject.CensysObject()
query = "select * from ipv4." + censys_object.get_latest_ipv4_tables(censys_object) + " where ip = \"8.8.8.8\""
# query = ''
print("Executing query: " + query)
# Start new Job
res = c.new_job(query)
job_id = res["job_id"]
result = c.check_job_loop(job_id)
if result['status'] == 'success':
print(result)
for path in result['download_paths']:
print('all paths:')
print(path)
else:
def __init__(self, api_id: str = "", api_secret: str = ""):
self.api_id = self.api_secret = ""
if api_id:
self.api_id = api_id
self.api_secret = api_secret
self.ipv4 = ipv4.CensysIPv4(self.api_id, self.api_secret)
self.websites = websites.CensysWebsites(self.api_id, self.api_secret)
self.certificates = certificates.CensysCertificates(self.api_id, self.api_secret)
self.export = export.CensysExport(self.api_id, self.api_secret)
def new_api_obj(str_type):
"""Returns initialised Censys SQL query API object"""
config = configparser.ConfigParser()
config.read(os.path.dirname(os.path.realpath(__file__)) + "/config.ini")
censys_id = (config['osint_sources']['CENSYS_API_ID'])
censys_key = (config['osint_sources']['CENSYS_API_KEY'])
if str_type == 'SQL_QUERY':
return censys.query.CensysQuery(api_id=censys_id, api_secret=censys_key)
elif str_type == 'SQL_EXPORT':
return censys.export.CensysExport(api_id=censys_id, api_secret=censys_key)
def new_api_obj(str_type):
"""Returns initialised Censys SQL query API object"""
config = configparser.ConfigParser()
config.read(os.path.dirname(os.path.realpath(__file__)) + "/config.ini")
censys_id = (config['osint_sources']['CENSYS_API_ID'])
censys_key = (config['osint_sources']['CENSYS_API_KEY'])
if str_type == 'SQL_QUERY':
return censys.query.CensysQuery(api_id=censys_id, api_secret=censys_key)
elif str_type == 'SQL_EXPORT':
return censys.export.CensysExport(api_id=censys_id, api_secret=censys_key)
config.read("keys.ini")
CENSYS_API_ID = (config['SectionOne']['CENSYS_API_ID'])
CENSYS_API_KEY = (config['SectionOne']['CENSYS_API_KEY'])
nrOfResults = 0
path_outputfile = 'outputfiles/censys/censys.json'
asn = -1
valid_asn = False
while not valid_asn:
asn = input("Enter ASN:")
if asn.isnumeric():
asn = int(asn)
if 0 <= asn <= 4294967295:
valid_asn = True
c = censys.export.CensysExport(api_id=CENSYS_API_ID, api_secret=CENSYS_API_KEY)
query = "select * from ipv4.20161112 where autonomous_system.asn = 1104" + 'limit 1'
# Start new Job
res = c.new_job(query)
job_id = res["job_id"]
result = c.check_job_loop(job_id)
if result['status'] == 'success':
for path in result['download_paths']:
with urllib.request.urlopen(path) as response, open(path_outputfile, 'wb') as out_file:
shutil.copyfileobj(response, out_file)
print('Got file from URL: ' + response.geturl())
else:
print('Censys job failed.' + '\n' + str(result))