Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
requires = get_poc_requires(data)
requires = [i.strip().strip('"').strip("'") for i in requires.split(',')] if requires else ['']
if requires[0]:
poc_name = get_poc_name(data)
info_msg = 'PoC script "{0}" requires "{1}" to be installed'.format(poc_name, ','.join(requires))
logger.info(info_msg)
try:
for r in requires:
if ":" in r:
r, module = r.split(":")
__import__(module)
else:
__import__(r)
except ImportError:
err_msg = 'try install with "python -m pip install {0}"'.format(r)
logger.error(err_msg)
raise SystemExit
if check_port(self.host_ip, self.bind_port):
logger.error('Port {} has been occupied, start Httpd serve failed!'.format(self.bind_port))
return
self.server_locked = True
self.setDaemon(daemon)
threading.Thread.start(self)
# Detect http server is started or not
detect_count = 10
while detect_count:
try:
logger.info('Detect {} server is runing or not...'.format(self.scheme))
if check_port(self.host_ip, self.bind_port):
break
except Exception as ex:
logger.error(str(ex))
time.sleep(random.random())
detect_count -= 1
def new_token(self):
data = '{{"username": "{}", "password": "{}"}}'.format(self.username, self.password)
try:
resp = requests.post('https://api.zoomeye.org/user/login', data=data, )
if resp.status_code != 401 and "access_token" in resp.json():
content = resp.json()
self.token = content['access_token']
self.headers = {'Authorization': 'JWT %s' % self.token}
return True
except Exception as ex:
logger.error(str(ex))
return False
def load_string_to_module(code_string, fullname=None):
try:
module_name = 'pocs_{0}'.format(get_md5(code_string)) if fullname is None else fullname
file_path = 'pocsuite://{0}'.format(module_name)
poc_loader = PocLoader(module_name, file_path)
poc_loader.set_data(code_string)
spec = importlib.util.spec_from_file_location(module_name, file_path, loader=poc_loader)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
return mod
except ImportError:
error_msg = "load module '{0}' failed!".format(fullname)
logger.error(error_msg)
raise
def token_is_available(self):
if self.token:
try:
resp = requests.get('https://api.shodan.io/account/profile?key={0}'.format(self.token))
if resp and resp.status_code == 200 and "member" in resp.json():
return True
except Exception as ex:
logger.error(str(ex))
return False
self.httpd.socket = ssl.wrap_socket(self.httpd.socket, certfile=self.certfile,
server_side=True)
else:
logger.error("You must provide certfile to use https")
break
thread = threading.Thread(target=self.httpd.serve_forever)
thread.setDaemon(True)
thread.start()
self.server_started = True
self.httpd.shutdown()
self.httpd.server_close()
logger.info('Stop httpd server on {}://{}:{}'.format(self.scheme, self.bind_ip, self.bind_port))
except Exception as ex:
self.httpd.shutdown()
self.httpd.server_close()
logger.error(str(ex))
kb.thread_exception = True
if num_threads > 1:
logger.info("waiting for threads to finish{0}".format(
" (Ctrl+C was pressed)" if isinstance(ex, KeyboardInterrupt) else ""))
try:
while threading.activeCount() > 1:
pass
except KeyboardInterrupt:
raise PocsuiteThreadException("user aborted (Ctrl+C was pressed multiple times)")
if forward_exception:
raise
except (PocsuiteConnectionException, PocsuiteValueException) as ex:
kb.thread_exception = True
logger.error("thread {0}: {1}".format(threading.currentThread().getName(), str(ex)))
if conf.verbose > 1:
traceback.print_exc()
except Exception as ex:
kb.thread_exception = True
logger.error("thread {0}: {1}".format(threading.currentThread().getName(), str(ex)))
traceback.print_exc()
finally:
kb.multi_thread_mode = False
kb.thread_continue = True
kb.thread_exception = False
def write_conf(self):
if not self.parser.has_section("Telnet404"):
self.parser.add_section("Telnet404")
try:
self.parser.set("Telnet404", "Jwt token", self.token)
self.parser.write(open(self.conf_path, "w"))
except Exception as ex:
logger.error(str(ex))