Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if len(split_target) == 3:
target, branch = '{p}:{u}'.format(p=split_target[0], u=split_target[1]), split_target[-1]
if re.match(reg, target) is None:
logger.critical('Please enter a valid URL')
exit()
branch = pipes.quote(branch)
elif len(split_target) == 2:
target, branch = self.target, 'master'
if re.match(reg, target) is None:
logger.critical('Please enter a valid URL')
exit()
branch = pipes.quote(branch)
else:
logger.critical('Target url exception: {u}'.format(u=self.target))
if 'gitlab' in target:
username = Config('git', 'username').value
password = Config('git', 'password').value
else:
username = None
password = None
gg = Git(repo_address=target, branch=branch, username=username, password=password)
# Git Clone Error
try:
clone_ret, clone_err = gg.clone()
if clone_ret is False:
raise PickupException('Clone Failed ({0})'.format(clone_err), gg)
except NotExistError:
raise NotExistException(4001, 'Repository or Branch Does not exist!', gg)
except AuthError:
raise AuthFailedException('Git Authentication Failed')
target_directory = gg.repo_directory
def send_mail(target, filename, receiver):
host = Config('email', 'host').value
port = Config('email', 'port').value
username = Config('email', 'username').value
password = Config('email', 'password').value
sender = Config('email', 'sender').value
is_ssl = to_bool(Config('email', 'ssl').value)
if is_ssl:
server = smtplib.SMTP_SSL(host=host, port=port)
else:
server = smtplib.SMTP(host=host, port=port)
s_sid = filename.split('.')[0]
msg = MIMEMultipart()
msg['From'] = sender
msg['To'] = receiver
msg['Subject'] = '编号 {sid} 项目 Cobra 扫描报告'.format(sid=s_sid)
msg.attach(MIMEText('扫描项目:{t}\n报告见附件'.format(t=target), 'plain', 'utf-8'))
try:
with open(filename, 'rb') as f:
def send_mail(target, filename, receiver):
host = Config('email', 'host').value
port = Config('email', 'port').value
username = Config('email', 'username').value
password = Config('email', 'password').value
sender = Config('email', 'sender').value
is_ssl = to_bool(Config('email', 'ssl').value)
if is_ssl:
server = smtplib.SMTP_SSL(host=host, port=port)
else:
server = smtplib.SMTP(host=host, port=port)
s_sid = filename.split('.')[0]
msg = MIMEMultipart()
msg['From'] = sender
msg['To'] = receiver
msg['Subject'] = '编号 {sid} 项目 Cobra 扫描报告'.format(sid=s_sid)
target, branch = '{p}:{u}'.format(p=split_target[0], u=split_target[1]), split_target[-1]
if re.match(reg, target) is None:
logger.critical('Please enter a valid URL')
exit()
branch = pipes.quote(branch)
elif len(split_target) == 2:
target, branch = self.target, 'master'
if re.match(reg, target) is None:
logger.critical('Please enter a valid URL')
exit()
branch = pipes.quote(branch)
else:
logger.critical('Target url exception: {u}'.format(u=self.target))
if 'gitlab' in target:
username = Config('git', 'username').value
password = Config('git', 'password').value
else:
username = None
password = None
gg = Git(repo_address=target, branch=branch, username=username, password=password)
# Git Clone Error
try:
clone_ret, clone_err = gg.clone()
if clone_ret is False:
raise PickupException('Clone Failed ({0})'.format(clone_err), gg)
except NotExistError:
raise NotExistException(4001, 'Repository or Branch Does not exist!', gg)
except AuthError:
raise AuthFailedException('Git Authentication Failed')
target_directory = gg.repo_directory
elif target_mode == TARGET_MODE_COMPRESS:
def start(target, format, output, rules, dels, all):
"""
start push target to api
:param target:
:param format:
:param output:
:param rules:
:param dels:
:param all:
:return:
"""
url = Config('git', 'gitlab_url').value
private_token = Config('git', 'private_token').value
cobra_ip = Config('git', 'cobra_ip').value
key = Config('cobra', 'secret_key').value
threads = []
result_path = code_path + '/result_sid'
fi = open(result_path, 'a+')
try:
if all is False and target is not '':
if isinstance(target, list):
for tar in target:
fi.write(tar + '\n')
else:
fi.write(target + '\n')
res = push_to_api(target, cobra_ip, key, fi, format, output, rules, dels)
elif all is True and target is '':
def is_update():
url = "https://static.nvd.nist.gov/feeds/xml/cve/2.0/nvdcve-2.0-modified.meta"
requests.packages.urllib3.disable_warnings()
r = requests.get(url, verify=False)
index = r.text.find('sha256:')
sha256_now = r.text[index + 7:].strip()
sha256_local = Config(level1='cve', level2='modified').value
if sha256_local != sha256_now:
logger.info("The CVE Rule already update, start update local rule")
config = ConfigParser()
config.read(config_path)
config.set('cve', 'modified', sha256_now)
try:
fi = open(config_path, 'w')
config.write(fi)
fi.close()
except IOError as e:
logger.warning(e)
logger.info("The sha256 been update")
return True
return False
def start(target, format, output, rules, dels, all):
"""
start push target to api
:param target:
:param format:
:param output:
:param rules:
:param dels:
:param all:
:return:
"""
url = Config('git', 'gitlab_url').value
private_token = Config('git', 'private_token').value
cobra_ip = Config('git', 'cobra_ip').value
key = Config('cobra', 'secret_key').value
threads = []
result_path = code_path + '/result_sid'
fi = open(result_path, 'a+')
try:
if all is False and target is not '':
if isinstance(target, list):
for tar in target:
fi.write(tar + '\n')
else:
fi.write(target + '\n')
res = push_to_api(target, cobra_ip, key, fi, format, output, rules, dels)
elif all is True and target is '':
pages = get_pages(url, private_token)
def key_verify(data):
key = Config(level1="cobra", level2="secret_key").value
_key = data.get("key")
if _key == key:
return True
elif not _key or _key == "":
return {"code": 1002, "msg": "Key cannot be empty."}
elif not _key == key:
return {"code": 4002, "msg": "Key verify failed."}
else:
return {"code": 4002, "msg": "Unknown key verify error."}
def __init__(self):
# mail
wd = int(datetime.datetime.today().strftime("%U"))
self.wd = wd
self.subject = '[Cobra] 代码安全周报(W{0})'.format(wd)
self.user = Config('email', 'username').value
self.to = Config('report', 'to').value
self.host = Config('email', 'host').value
self.port = Config('email', 'port').value
self.password = Config('email', 'password').value
start = datetime.datetime.today() + datetime.timedelta(days=-7)
end = datetime.datetime.today().strftime("%Y-%m-%d")
start = start.strftime("%Y-%m-%d")
self.param = [node, os.path.join(project_directory, 'reports', 'report.js'), project_directory, start, end]
def summary():
a_sid = request.args.get(key='sid')
key = Config(level1="cobra", level2="secret_key").value
if a_sid is None:
return render_template(template_name_or_list='index.html',
key=key)
status_url = 'http://{host}:{port}/api/status'.format(host=running_host, port=running_port)
post_data = {
'key': key,
'sid': a_sid,
}
headers = {
"Content-Type": "application/json",
}
r = requests.post(url=status_url, headers=headers, data=json.dumps(post_data))
try:
scan_status = json.loads(r.text)
except ValueError as e: