Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if res:
logger.info("[GIT-PRO] Git push success")
logger.info("[GIT-PRO] All projects have been pushed")
else:
logger.warning("[GIT-PRO] Git push fail")
fi.close()
except requests.exceptions.MissingSchema:
logger.warning('[GIT-PRO] Please write gitlab_url and private_token in config file')
except requests.exceptions.ConnectionError:
logger.warning('[GIT-PRO] Please check the cobra_ip or gitlab_url is right')
except requests.exceptions.InvalidSchema:
logger.warning('[GIT-PRO] Please add http:// before the cobra_ip or gitlab_url')
except Exception as e:
logger.warning('[GIT-PRO] {}'.format(e.message))
else:
logger.warning("[GIT-PRO] Git push fail")
fi.close()
except requests.exceptions.MissingSchema:
logger.warning('[GIT-PRO] Please write gitlab_url and private_token in config file')
except requests.exceptions.ConnectionError:
logger.warning('[GIT-PRO] Please check the cobra_ip or gitlab_url is right')
except requests.exceptions.InvalidSchema:
logger.warning('[GIT-PRO] Please add http:// before the cobra_ip or gitlab_url')
except Exception as e:
logger.warning('[GIT-PRO] {}'.format(e.message))
else:
res = False
if res:
logger.info("[GIT-PRO] Git push success")
logger.info("[GIT-PRO] All projects have been pushed")
else:
logger.warning("[GIT-PRO] Git push fail")
fi.close()
except requests.exceptions.MissingSchema:
logger.warning('[GIT-PRO] Please write gitlab_url and private_token in config file')
except requests.exceptions.ConnectionError:
logger.warning('[GIT-PRO] Please check the cobra_ip or gitlab_url is right')
except requests.exceptions.InvalidSchema:
logger.warning('[GIT-PRO] Please add http:// before the cobra_ip or gitlab_url')
except Exception as e:
logger.warning('[GIT-PRO] {}'.format(e.message))
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
res = push_to_api(git_urls, cobra_ip, key, fi, format, output, rules, dels)
else:
res = False
if res:
logger.info("[GIT-PRO] Git push success")
logger.info("[GIT-PRO] All projects have been pushed")
else:
logger.warning("[GIT-PRO] Git push fail")
fi.close()
except requests.exceptions.MissingSchema:
logger.warning('[GIT-PRO] Please write gitlab_url and private_token in config file')
except requests.exceptions.ConnectionError:
logger.warning('[GIT-PRO] Please check the cobra_ip or gitlab_url is right')
except requests.exceptions.InvalidSchema:
logger.warning('[GIT-PRO] Please add http:// before the cobra_ip or gitlab_url')
except Exception as e:
logger.warning('[GIT-PRO] {}'.format(e.message))
self.password,
self.filename
)
p = subprocess.Popen(cmd, shell=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
(diff_out, diff_err) = p.communicate()
diff_out = diff_out.decode('utf-8')
diff_err = diff_err.decode('utf-8')
if len(diff_err) == 0:
logger.debug("[PICKUP] svn diff success")
elif 'authorization failed' in diff_err:
logger.warning("svn diff auth failed")
sys.exit(1)
elif 'Not a valid URL' in diff_err:
logger.warning("[PICKUP] svn diff url not a valid")
sys.exit(1)
if issue_url:
info_msg = "created Github issue can been found at the address '{u}'".format(u=issue_url.group(0))
logger.info(info_msg)
try:
with open(issue_history_path, "a+b") as f:
f.write("{k}\n".format(k=key))
except:
pass
else:
warn_msg = "something went wrong while creating a Github issue"
if ex:
warn_msg += " ('{m}')".format(m=get_safe_ex_string(ex))
if "Unauthorized" in warn_msg:
warn_msg += ". Please update to the latest revision"
logger.warning(warn_msg)
requests.packages.urllib3.disable_warnings()
r = requests.get(url, verify=False)
index = r.text.find('sha256:')
sha256_now = r.text[index + 7:].strip()
sha256_local = Config(level1='cve', level2='modified').value
if sha256_local != sha256_now:
logger.info("The CVE Rule already update, start update local rule")
config = ConfigParser()
config.read(config_path)
config.set('cve', 'modified', sha256_now)
try:
fi = open(config_path, 'w')
config.write(fi)
fi.close()
except IOError as e:
logger.warning(e)
logger.info("The sha256 been update")
return True
return False
rule_path = os.path.join(project_directory, 'rules')
files = os.listdir(rule_path)
if specific_rule:
if specific_rule in files:
cve_files.append(specific_rule)
else:
for cvi_file in files:
if cvi_file.startswith('CVI-999'):
cve_files.append(cvi_file)
if len(cve_files) == 0:
logger.info("Can't find the rules, please update rules")
return
try:
pool = multiprocessing.Pool()
except IOError:
logger.warning('[SCAN] [CVE] IOError Broken pipe')
logger.info('[PUSH] {rc} CVE Rules'.format(rc=len(cve_files)))
for cve_file in cve_files:
cve_path = os.path.join(rule_path, cve_file)
pool.apply_async(scan_single, args=(target_directory, cve_path), callback=store)
pool.close()
pool.join()
return cve_vuls
}
f.write(dict_to_json(json_data))
else:
with open(filename, 'r+', encoding='utf-8') as f:
try:
json_data = json.load(f)
json_data.update({sid: scan_data})
# 使用 r+ 模式不会覆盖,调整文件指针到开头
f.seek(0)
f.truncate()
f.write(dict_to_json(json_data))
except ValueError:
logger.warning('[EXPORT] The json file have invaild token or None: {}'.format(os.path.join(export_path, filename)))
return False
except IOError:
logger.warning('[EXPORT] Please input a file path after the -o parameter')
return False
elif output_format == 'xml' or output_format == 'XML':
xml_data = {
sid: scan_data,
}
if not os.path.exists(filename):
with open(filename, 'w', encoding='utf-8') as f:
f.write("""\n""")
f.write("""\n""")
f.write(dict_to_xml(xml_data))
f.write("""\n\n""")
else:
# 在倒数第二行插入
with open(filename, 'r+', encoding='utf-8') as f:
results = f.readlines()
ex = None
try:
url = "https://api.github.com/search/issues?q={q}".format(q=urllib.quote("repo:WhaleShark-Team/cobra [AUTO] Unhandled exception (#{k})".format(k=key)))
logger.debug(url)
resp = requests.get(url=url)
content = resp.json()
_ = content
duplicate = _["total_count"] > 0
closed = duplicate and _["items"][0]["state"] == "closed"
if duplicate:
warn_msg = "issue seems to be already reported"
if closed:
warn_msg += " and resolved. Please update to the latest version from official GitHub repository at '{u}'".format(u=__url__)
logger.warning(warn_msg)
return
except:
logger.warning('search github issue failed')
pass
try:
url = "https://api.github.com/repos/WhaleShark-Team/cobra/issues"
data = {
"title": "[AUTO] Unhandled exception (#{k})".format(k=key),
"body": "## Environment\n```\n{err}\n```\n## Traceback\n```\n{exc}\n```\n".format(err=err_msg, exc=exc_msg)
}
headers = {"Authorization": "token {t}".format(t=base64.b64decode(access_token))}
resp = requests.post(url=url, data=json.dumps(data), headers=headers)
content = resp.text
except Exception as ex:
content = None