Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
for i in range(10):
thread = threading.Thread(target=get_git_urls, args=(url, private_token, q_pages, fi))
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
res = push_to_api(git_urls, cobra_ip, key, fi, format, output, rules, dels)
else:
res = False
if res:
logger.info("[GIT-PRO] Git push success")
logger.info("[GIT-PRO] All projects have been pushed")
else:
logger.warning("[GIT-PRO] Git push fail")
fi.close()
except requests.exceptions.MissingSchema:
logger.warning('[GIT-PRO] Please write gitlab_url and private_token in config file')
except requests.exceptions.ConnectionError:
logger.warning('[GIT-PRO] Please check the cobra_ip or gitlab_url is right')
except requests.exceptions.InvalidSchema:
logger.warning('[GIT-PRO] Please add http:// before the cobra_ip or gitlab_url')
except Exception as e:
logger.warning('[GIT-PRO] {}'.format(e.message))
def functions(self):
"""
get all functions in this file
:return:
"""
grep = Tool().grep
if self.language not in self.regex:
logger.info("[AST] Undefined language's functions regex {0}".format(self.language))
return False
regex_functions = self.regex[self.language]['functions']
param = [grep, "-s", "-n", "-r", "-P"] + [regex_functions, self.file_path]
p = subprocess.Popen(param, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
result, error = p.communicate()
try:
result = result.decode('utf-8')
error = error.decode('utf-8')
except AttributeError as e:
pass
if len(error) is not 0:
logger.critical('[AST] {err}'.format(err=error.strip()))
if len(result):
functions = {}
lines = result.strip().split("\n")
prev_function_name = ''
result, error = p.communicate()
try:
result = result.decode('utf-8')
error = error.decode('utf-8')
except AttributeError as e:
pass
if len(error) is not 0:
logger.critical('[AST] {err}'.format(err=error.strip()))
if len(result):
functions = {}
lines = result.strip().split("\n")
prev_function_name = ''
for index, line in enumerate(lines):
line = line.strip()
if line == '':
logger.info('[AST] Empty')
continue
line_arr = line.split(':')
if len(line_arr) < 2:
logger.info("[AST] Not found(:)")
regex_annotation = self.regex[self.language]['annotation']
string = re.findall(regex_annotation, line_arr[1].strip())
if len(string) >= 1 and string[0] != '':
logger.info("[AST] This function is annotation")
function_name = re.findall(regex_functions, line_arr[1].strip())
if len(function_name) >= 1:
if len(function_name) == 2:
if function_name[0] != '':
function_name = function_name[0]
elif function_name[1] != '':
logger.info('[CLI] [STATISTIC] Files: {fc}, Extensions:{ec}, Consume: {tc}'.format(fc=file_count,
ec=len(files),
tc=time_consume))
if pa.special_rules is not None:
logger.info('[CLI] [SPECIAL-RULE] only scan used by {r}'.format(r=','.join(pa.special_rules)))
# scan
scan(target_directory=target_directory, a_sid=a_sid, s_sid=s_sid, special_rules=pa.special_rules,
language=main_language, framework=main_framework, file_count=file_count, extension_count=len(files))
if target_mode == 'git' and '/tmp/cobra/git/' in target_directory and is_del is True:
res = clean_dir(target_directory)
if res is True:
logger.info('[CLI] Target directory remove success')
else:
logger.info('[CLI] Target directory remove fail')
except PickupException:
result = {
'code': 1002,
'msg': 'Repository not exist!'
}
Running(s_sid).data(result)
logger.critical('Repository or branch not exist!')
exit()
except Exception:
result = {
'code': 1002,
'msg': 'Exception'
}
Running(s_sid).data(result)
raise
def get_real_directory(self):
"""
get real directory
/path/project-v1.2/project-v1.2 -> /path/project-v1.2/
:param:
:return:
"""
directory = os.path.join(self.package_path, self.dir_name)
file_count = 0
directory_path = None
for filename in os.listdir(directory):
directory_path = os.path.join(directory, filename)
file_count += 1
logger.info("Decompress path count: {0}, directory path: {1}".format(file_count, directory_path))
if file_count == 1 and os.path.isdir(directory_path):
return directory_path
else:
return directory
start_time = datetime.datetime.now()
logger.info("Start decompress rule files, Please wait a moment....")
for gz_file in gz_files:
if os.path.exists(gz_file):
f_name = gz_file.replace(".gz", "")
try:
g_file = gzip.GzipFile(gz_file, "rb")
open(f_name, "wb+").write(g_file.read())
g_file.close()
except IOError:
logger.warning('[CVE] The {} download fail'.format(gz_file))
os.remove(gz_file)
end_time = datetime.datetime.now()
logger.info("Decompress success, use time:%ds" % (end_time - start_time).seconds)
return True
def rule_parse():
if is_update():
gz_files = download_rule_gz()
un_gz(gz_files)
pool = multiprocessing.Pool()
for year in range(2002, datetime.datetime.now().year + 1):
cve_xml = project_directory + "/rules/%d.xml" % year
pool.apply_async(rule_single, args=(cve_xml, year))
pool.close()
pool.join()
for year in range(2002, datetime.datetime.now().year + 1):
os.remove(project_directory + "/rules/%d.xml" % year)
logger.info("The rule update success, start scan cve vuls")
return True
else:
logger.info("The CVE Rule not update, start scan cve vuls")
if not self.checkout(self.repo_branch):
os.chdir(repo_dir)
return False, "Checkout failed."
cmd = 'git pull origin ' + self.repo_branch
p = subprocess.Popen(cmd, shell=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
pull_out, pull_err = p.communicate()
try:
pull_out = pull_out.decode('utf-8')
pull_err = pull_err.decode('utf-8')
except AttributeError as e:
pass
logger.info('[PICKUP] [PULL] {o}'.format(o=pull_out.strip()))
logger.info('[PICKUP] [PULL] {e}'.format(e=pull_err.strip().replace(u'\n', u' >')))
self.parse_err(pull_err)
pull_err = pull_err.replace('{0}:{1}'.format(self.repo_username, self.repo_password), '')
# change work directory back.
os.chdir(repo_dir)
pull_out = pull_out.lower()
if 'up' in pull_out and 'to' in pull_out and 'date' in pull_out:
logger.info('[PICKUP] [PULL] pull done.')
return True, None
else:
return False, pull_err
"""
if not self.__check_exist():
logger.info('[PICKUP] No repo directory.')
return False
current_dir = os.getcwd()
os.chdir(self.repo_directory)
cmd = "git fetch origin && git reset --hard origin/{branch} && git checkout {branch}".format(branch=branch)
p = subprocess.Popen(cmd, shell=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
(checkout_out, checkout_err) = p.communicate()
checkout_out = checkout_out.decode('utf-8')
checkout_err = checkout_err.decode('utf-8')
logger.info('[PICKUP] [CHECKOUT] ' + checkout_err.strip())
# Already on
# did not match
# Switched to a new branch
if 'did not match' in checkout_err:
os.chdir(current_dir)
return False
else:
os.chdir(current_dir)
return True