Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_ipv4_https(self):
try:
logger.info('Test https server in ipv4')
PHTTPServer._instance = None
httpd = PHTTPServer(bind_ip='0.0.0.0', bind_port=666, use_https=True,
requestHandler=BaseRequestHandler)
httpd.start()
url = '{}://{}:{}/'.format('https', get_host_ip(), 666)
requests.get(url)
except requests.exceptions.SSLError:
url = '{}://{}:{}/'.format('https', get_host_ip(), 666)
resp = requests.get(url, verify=False)
self.assertEqual(resp.status_code, 200)
except Exception:
assert False
finally:
httpd.stop()
def check_requires(data):
requires = get_poc_requires(data)
requires = [i.strip().strip('"').strip("'") for i in requires.split(',')] if requires else ['']
if requires[0]:
poc_name = get_poc_name(data)
info_msg = 'PoC script "{0}" requires "{1}" to be installed'.format(poc_name, ','.join(requires))
logger.info(info_msg)
try:
for r in requires:
if ":" in r:
r, module = r.split(":")
__import__(module)
else:
__import__(r)
except ImportError:
err_msg = 'try install with "python -m pip install {0}"'.format(r)
logger.error(err_msg)
raise SystemExit
def run_threads(num_threads, thread_function, args: tuple = (), forward_exception=True, start_msg=True):
threads = []
kb.multi_thread_mode = True
kb.thread_continue = True
kb.thread_exception = False
try:
if num_threads > 1:
if start_msg:
info_msg = "staring {0} threads".format(num_threads)
logger.info(info_msg)
if num_threads > MAX_NUMBER_OF_THREADS:
warn_msg = ""
logger.warn(warn_msg)
else:
thread_function()
return
# Start the threads
for num_threads in range(num_threads):
thread = threading.Thread(target=exception_handled_function, name=str(num_threads),
args=(thread_function, args))
thread.setDaemon(True)
try:
thread.start()
def start(self, daemon=True):
# Http server can only allow start once in pocsuite3, avoid muti-threading start muti-times
if self.server_locked:
logger.info(
'Httpd serve has been started on {}://{}:{}, '.format(self.scheme, self.bind_ip, self.bind_port))
return
if check_port(self.host_ip, self.bind_port):
logger.error('Port {} has been occupied, start Httpd serve failed!'.format(self.bind_port))
return
self.server_locked = True
self.setDaemon(daemon)
threading.Thread.start(self)
# Detect http server is started or not
detect_count = 10
while detect_count:
try:
logger.info('Detect {} server is runing or not...'.format(self.scheme))
if check_port(self.host_ip, self.bind_port):
poc_name, poc_ext = os.path.splitext(poc)
if poc_ext in ['.py', '.pyc']:
file_path = os.path.join(paths.POCSUITE_POCS_PATH, poc)
else:
file_path = os.path.join(paths.POCSUITE_POCS_PATH, poc + exists_pocs.get(poc))
if file_path:
info_msg = "loading PoC script '{0}'".format(file_path)
logger.info(info_msg)
load_poc_sucess = load_file_to_module(file_path)
# step2. load poc from given file path
try:
if not load_poc_sucess:
if not poc.startswith('ssvid-') and check_file(poc):
info_msg = "loading PoC script '{0}'".format(poc)
logger.info(info_msg)
load_poc_sucess = load_file_to_module(poc)
except PocsuiteSystemException:
logger.error('PoC file "{0}" not found'.format(repr(poc)))
continue
# step3. load poc from seebug website using plugin 'poc_from_seebug'
if not load_poc_sucess:
if poc.startswith('ssvid-'):
info_msg = "loading Poc script 'https://www.seebug.org/vuldb/{0}'".format(poc)
logger.info(info_msg)
if "poc_from_seebug" not in conf.plugins:
conf.plugins.append('poc_from_seebug')
load_keyword_poc_sucess = False
if conf.vul_keyword:
# step4. load poc with vul_keyword search seebug website
def start():
runtime_check()
tasks_count = kb.task_queue.qsize()
info_msg = "pocsusite got a total of {0} tasks".format(tasks_count)
logger.info(info_msg)
logger.debug("pocsuite will open {} threads".format(conf.threads))
try:
run_threads(conf.threads, task_run)
logger.info("Scan completed,ready to print")
finally:
task_done()
if conf.mode == "shell" and not conf.api:
info_msg = "connect back ip: {0} port: {1}".format(
desensitization(conf.connect_back_host) if conf.ppt else conf.connect_back_host, conf.connect_back_port)
logger.info(info_msg)
info_msg = "watting for shell connect to pocsuite"
logger.info(info_msg)
if conf.console_mode:
handle_listener_connection_for_console()
def start(self):
""" Routersploit main entry point. Starting interpreter loop. """
while True:
try:
command, args = self.parse_line(input(self.prompt))
command = command.lower()
if not command:
continue
command_handler = self.get_command_handler(command)
command_handler(args)
except PocsuiteBaseException as err:
logger.error(err)
except EOFError:
logger.info("Pocsuite stopped")
break
except KeyboardInterrupt:
logger.info("User Quit")
break
def update():
if not conf.update_all:
return
success = False
if not os.path.exists(os.path.join(paths.POCSUITE_ROOT_PATH, "../", ".git")):
warn_msg = "not a git repository. It is recommended to clone the 'knownsec/pocsuite3' repository "
warn_msg += "from GitHub (e.g. 'git clone --depth 1 {} pocsuite3')".format(GIT_REPOSITORY)
logger.warn(warn_msg)
if VERSION == get_latest_revision():
logger.info("already at the latest revision '{}'".format(get_revision_number()))
return
else:
info_msg = "updating pocsuite3 to the latest development revision from the "
info_msg += "GitHub repository"
logger.info(info_msg)
debug_msg = "pocsuite3 will try to update itself using 'git' command"
logger.debug(debug_msg)
data_to_stdout("\r[{0}] [INFO] update in progress ".format(time.strftime("%X")))
cwd_path = os.path.join(paths.POCSUITE_ROOT_PATH, "../")
try:
process = subprocess.Popen("git checkout . && git pull %s HEAD" % GIT_REPOSITORY,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=cwd_path.encode(
sys.getfilesystemencoding() or UNICODE_ENCODING))
poll_process(process, True)
stdout, stderr = process.communicate()
success = not process.returncode
except (IOError, OSError) as ex:
success = False