Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def init(self):
self.init_shodan_api()
info_msg = "[PLUGIN] try fetch targets from shodan with dork: {0}".format(conf.dork)
logger.info(info_msg)
targets = self.shodan.search(conf.dork, conf.max_page, resource=conf.search_type)
if targets:
count = 0
for target in targets:
if self.add_target(target):
count += 1
info_msg = "[PLUGIN] get {0} target(s) from shodan".format(count)
logger.info(info_msg)
def init(self):
info_msg = "[PLUGIN] try fetch targets from CIDR..."
logger.info(info_msg)
cidr_set = set()
if "CIDR" in os.environ:
cidr_set.add(os.environ.get("CIDR"))
elif conf.url:
for i in conf.url:
cidr_set.add(i)
conf.url = []
else:
cidr_text = input("Please input CIDR address:")
cidr_set.add(cidr_text)
count = 0
for i in cidr_set:
try:
network = ip_network(i, strict=False)
for host in network.hosts():
self.add_target(host.exploded)
def ssh_burst(host, port, task_queue, result_queue):
log = paramiko.util.logging.getLogger()
log.setLevel(logging.CRITICAL)
if not port_check(host, port):
logger.warning("{}:{} is unreachable".format(host, port))
return
try:
task_init(host, port, task_queue, result_queue)
run_threads(4, task_thread, args=(task_queue, result_queue))
except Exception:
pass
try:
# 发起payload请求
resp = req.post(target_url, json=j, headers=self.headers)#, proxies={'http': 'http://127.0.0.1:8087'})
if self.test_EL(resp): # 验证响应中json的相应字段是否已经执行了EL表达式
result['VerifyInfo'] = {}
result['VerifyInfo']['URL'] = target_url
result['VerifyInfo']['Credentials'] = auth_header
return self.save_output(result)
return self.save_output(result)
except json.decoder.JSONDecodeError as e:
if resp.status_code == 401:
pass
#print("认证失败")
else:
logger.info("json解析失败")
# 失败了可能只是密码错误,继续下一个密码尝试
continue
except Exception as e:
logger.error(e)
raise e
try:
# 发起payload请求
resp = req.post(target_url, json=j, headers=self.headers)#, proxies={'http': 'http://127.0.0.1:8087'})
if self.test_EL(resp): # 验证响应中json的相应字段是否已经执行了EL表达式
result['VerifyInfo'] = {}
result['VerifyInfo']['URL'] = target_url
return self.save_output(result)
return self.save_output(result)
except json.decoder.JSONDecodeError as e:
if resp.status_code == 401:
pass
#print("认证失败")
else:
logger.info("json解析失败")
# 失败了可能只是密码错误,继续下一个密码尝试
continue
except Exception as e:
logger.error(e)
raise e
def test_EL(self, p_resp):
d = p_resp.json()
result = d[0]['message']
logger.info(result)
try:
if str(self.ran_sum) in result:
return True
except Exception:
return False
def task_thread():
while not task_queue.empty():
host, port, username, password = task_queue.get()
logger.info('try burst {}:{} use username:{} password:{}'.format(
host, port, username, password))
if ftp_login(host, port, username, password):
with task_queue.mutex:
task_queue.queue.clear()
result_queue.put((username, password))
info_msg = "[PLUGIN] load PoC script {0} from seebug failed".format(_)
logger.info(info_msg)
if conf.vul_keyword:
pocs = self.seebug.search_poc(conf.vul_keyword)
info_msg = "Found {0} available PoC(s) from Seebug website".format(len(pocs))
logger.info(info_msg)
for poc_item in pocs:
ssvid = str(poc_item['id'])
poc = self.seebug.fetch_poc(ssvid)
if poc and self.add_poc(poc):
info_msg = "[PLUGIN] load PoC script '{0}' from seebug success".format(poc_item['name'])
else:
info_msg = "[PLUGIN] load PoC script '{0}' from seebug failed".format(poc_item['name'])
logger.info(info_msg)
if conf.ssvid:
ssvid = conf.ssvid
poc = self.seebug.fetch_poc(ssvid)
if poc and self.add_poc(poc):
info_msg = "[PLUGIN] load PoC script 'ssvid-{0}' from seebug success".format(ssvid)
else:
info_msg = "[PLUGIN] load PoC script 'ssvid-{0}' from seebug failed".format(ssvid)
logger.info(info_msg)
if self.test_EL(resp): # 验证响应中json的相应字段是否已经执行了EL表达式
result['VerifyInfo'] = {}
result['VerifyInfo']['URL'] = target_url
return self.save_output(result)
return self.save_output(result)
except json.decoder.JSONDecodeError as e:
if resp.status_code == 401:
pass
#print("认证失败")
else:
logger.info("json解析失败")
# 失败了可能只是密码错误,继续下一个密码尝试
continue
except Exception as e:
logger.error(e)
raise e
filename = "pocsuite_{0}.html".format(time.strftime("%Y%m%d_%H%M%S"))
filename = os.path.join(paths.POCSUITE_OUTPUT_PATH, filename)
if conf.url:
title = "Report of {0}".format(repr(conf.url))
elif conf.dork:
title = "Report of [{0}]".format(conf.dork)
else:
title = "Report of [{0}]".format('Plugin imported targets')
html_export = HtmlExport(filename=filename, title=title)
results = get_results()
if results:
results = sorted(results, key=lambda r: r.status, reverse=True)
html_export.write_html(results)
info_msg = '[PLUGIN] generate html report at {0}'.format(filename)
logger.info(info_msg)