Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from __future__ import print_function
import csv
import sys
import netaddr
from ..base import CensysAPIBase
class CensysAdminMaxmind(CensysAPIBase):
def upload(self, collection, version, records):
url = "/admin/maxmind/%s/%i" % (collection, version)
return self._post(url, data={"records": records})
def delete(self, collection, version):
url = "/admin/maxmind/%s/%i" % (collection, version)
return self._delete(url)
def main():
if len(sys.argv) < 5:
sys.stderr.write("USAGE: %s collection version locations.csv" \
" blocks.csv\n"
% sys.argv[0])
sys.exit(1)
from __future__ import print_function
import unittest
import time
import json
from .base import CensysAPIBase
class CensysQuery(CensysAPIBase):
def new_job(self, query):
data = {"query": query}
return self._post("query", data=data)
def get_series(self):
path = "/query_definitions"
return self._get(path)
def get_series_details(self, series):
path = "/".join(("query_definitions", series))
return self._get(path)
def check_job(self, job_id):
path = "/".join(("query", job_id))
return self._get(path)
const=const)
def _get(self, endpoint, args=None):
return self._make_call(self._session.get, endpoint, args)
def _post(self, endpoint, args=None, data=None):
return self._make_call(self._session.post, endpoint, args, data)
def _delete(self, endpoint, args=None):
return self._make_call(self._session.delete, endpoint, args)
def account(self):
return self._get("account")
class CensysIndex(CensysAPIBase):
INDEX_NAME = None
def __init__(self, *args, **kwargs):
CensysAPIBase.__init__(self, *args, **kwargs)
# generate concrete paths to be called
self.search_path = "search/%s" % self.INDEX_NAME
self.view_path = "view/%s" % self.INDEX_NAME
self.report_path = "report/%s" % self.INDEX_NAME
def metadata(self, query):
data = {
"query": query,
"page": 1,
"fields":[]
}
def __init__(self, *args, **kwargs):
CensysAPIBase.__init__(self, *args, **kwargs)
# generate concrete paths to be called
self.search_path = "search/%s" % self.INDEX_NAME
self.view_path = "view/%s" % self.INDEX_NAME
self.report_path = "report/%s" % self.INDEX_NAME
from __future__ import print_function
import json
import sys
import unittest
import time
from .base import CensysAPIBase
class CensysExport(CensysAPIBase):
def new_job(self, query, format="json", flatten=False, compress=False,
delimiter=None, headers=None):
assert format in ("json", "csv")
assert flatten in (True, False)
assert compress in (True, False)
data = {
"query": query,
"format": format,
"flatten": flatten,
"compress": compress,
"delimiter": delimiter,
"headers": headers
}
return self._post("export", data=data)
def setUpClass(cls):
cls._api = CensysAPIBase()