Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
:param mod: 模式类型 verify|attack|shell
:return:
"""
# 设置全局参数
if self.current_module.current_protocol == POC_CATEGORY.PROTOCOL.HTTP:
target = self.current_module.getg_option("target")
else:
rhost = self.current_module.getg_option("rhost")
rport = self.current_module.getg_option("rport")
ssl = self.current_module.getg_option("ssl")
scheme = "http"
if ssl:
scheme = "https"
target = "{scheme}://{rhost}:{rport}".format(scheme=scheme, rhost=rhost, rport=rport)
conf.mode = mod
kb.task_queue.put((target, self.current_module))
try:
start()
except PocsuiteShellQuitException:
pass
kb.results = []
def single_time_log_message(message, level=logging.INFO, flag=None):
if flag is None:
flag = hash(message)
if flag not in kb.single_log_flags:
kb.single_log_flags.add(flag)
logger.log(level, message)
def run_threads(num_threads, thread_function, args: tuple = (), forward_exception=True, start_msg=True):
threads = []
kb.multi_thread_mode = True
kb.thread_continue = True
kb.thread_exception = False
try:
if num_threads > 1:
if start_msg:
info_msg = "staring {0} threads".format(num_threads)
logger.info(info_msg)
if num_threads > MAX_NUMBER_OF_THREADS:
warn_msg = ""
logger.warn(warn_msg)
else:
thread_function()
return
def _set_multiple_targets():
# set multi targets to kb
if conf.url:
targets = set()
for url in conf.url:
parsed = parse_target(url)
if parsed:
targets.add(parsed)
if not targets:
err_msg = "incorrect target url or ip format!"
logger.error(err_msg)
for target in targets:
kb.targets.add(target)
if conf.url_file:
for line in get_file_items(conf.url_file, lowercase=False, unique=True):
kb.targets.add(line)
if conf.dork:
# enable plugin 'target_from_zoomeye' by default
if 'target_from_shodan' not in conf.plugins and 'target_from_fofa' not in conf.plugins:
conf.plugins.append('target_from_zoomeye')
if conf.dork_zoomeye:
conf.plugins.append('target_from_zoomeye')
if conf.dork_shodan:
conf.plugins.append('target_from_shodan')
logger.debug(debug_msg)
kb.abs_file_paths = set()
kb.os = None
kb.os_version = None
kb.arch = None
kb.dbms = None
kb.auth_header = None
kb.counters = {}
kb.multi_thread_mode = False
kb.thread_continue = True
kb.thread_exception = False
kb.word_lists = None
kb.single_log_flags = set()
kb.cache = AttribDict()
kb.cache.addrinfo = {}
kb.cache.content = {}
kb.cache.regex = {}
kb.data = AttribDict()
kb.data.local_ips = []
kb.data.connect_back_ip = None
kb.data.connect_back_port = DEFAULT_LISTENER_PORT
kb.data.clients = []
kb.targets = OrderedSet()
kb.plugins = AttribDict()
kb.plugins.targets = AttribDict()
kb.plugins.pocs = AttribDict()
kb.plugins.results = AttribDict()
kb.results = []
kb.current_poc = None
def get_poc_options(poc_obj=None):
poc_obj = poc_obj or kb.current_poc
return poc_obj.get_options()
def _set_task_queue():
if kb.registered_pocs and kb.targets:
for poc_module in kb.registered_pocs:
for target in kb.targets:
kb.task_queue.put((target, poc_module))
def runtime_check():
if not kb.registered_pocs:
error_msg = "no PoC loaded, please check your PoC file"
logger.error(error_msg)
raise PocsuiteSystemException(error_msg)
kb.cache = AttribDict()
kb.cache.addrinfo = {}
kb.cache.content = {}
kb.cache.regex = {}
kb.data = AttribDict()
kb.data.local_ips = []
kb.data.connect_back_ip = None
kb.data.connect_back_port = DEFAULT_LISTENER_PORT
kb.data.clients = []
kb.targets = OrderedSet()
kb.plugins = AttribDict()
kb.plugins.targets = AttribDict()
kb.plugins.pocs = AttribDict()
kb.plugins.results = AttribDict()
kb.results = []
kb.current_poc = None
kb.registered_pocs = AttribDict()
kb.task_queue = Queue()
kb.cmd_line = DIY_OPTIONS or []
kb.comparison = None
def _set_task_queue():
if kb.registered_pocs and kb.targets:
for poc_module in kb.registered_pocs:
for target in kb.targets:
kb.task_queue.put((target, poc_module))