Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def analyse(self, test=False):
if self.directory is None:
logger.critical('Please set directory')
sys.exit()
files = directory.Directory(self.directory).collect_files()
logger.info('**Scan Files**\r\n > Files count: `{files}`\r\n > Time consume: `{consume}s`\r\n'.format(files=files['file_nums'], consume=files['collect_time']))
ext_language = {
# Image
'.jpg': 'image',
'.png': 'image',
'.bmp': 'image',
'.gif': 'image',
'.ico': 'image',
'.cur': 'image',
# Font
'.eot': 'font',
'.otf': 'font',
'.svg': 'font',
'.ttf': 'font',
'.woff': 'font',
# CSS
def clone(self):
"""Clone a repo from repo_address
:return: True - clone success, False - clone error.
"""
logger.info('clone repository...')
if self.__check_exist():
logger.info('repository already exist.')
return self.pull()
# call(['rm', '-rf', self.repo_directory])
# if no username or password provide, it may be a public repo.
if self.repo_username is None or self.repo_password is None:
# public repo
clone_address = self.repo_address
else:
# private repo
clone_address = self.repo_address.split('://')[0] + '://' + quote(self.repo_username) + ':' + \
self.repo_password + '@' + self.repo_address.split('://')[1]
# clone repo with username and password
# "http[s]://username:password@gitlab.com/username/reponame"
# !!! if add password in the url, .git/config will log your url with password
def pull(self):
"""Pull a repo from repo_address and repo_directory"""
logger.info('pull repository...')
if not self.__check_exist():
return False, 'No local repo exist. Please clone first.'
# change work directory to the repo
repo_dir = self.repo_directory
logger.debug('cd directory: {0}'.format(repo_dir))
os.chdir(repo_dir)
cmd = 'git pull origin master'
p = subprocess.Popen(cmd, shell=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
(pull_out, pull_err) = p.communicate()
logger.info(pull_out)
logger.info(pull_err)
self.parse_err(pull_err)
pull_err = pull_err.replace('{0}:{1}'.format(self.repo_username, self.repo_password), '')
# change work directory back.
os.chdir(repo_dir)
if 'Updating' in pull_out or 'up-to-date' in pull_out:
logger.info('pull done.')
return True, None
else:
return False, pull_err
def checkout(self, branch):
"""
Checkout to special branch.
:param branch: branch name
:return: True-checkout success or already on special branch
False-checkout failed. Maybe no branch name.
"""
if not self.__check_exist():
logger.info('No repo directory.')
return False
current_dir = os.getcwd()
os.chdir(self.repo_directory)
cmd = "git checkout " + branch
p = subprocess.Popen(cmd, shell=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
(checkout_out, checkout_err) = p.communicate()
logger.info(checkout_err)
# Already on
# did not match
# Switched to a new branch
if 'did not match' in checkout_err:
os.chdir(current_dir)
return False
def scan(target_directory):
pool = multiprocessing.Pool()
if len(rules) == 0:
logger.critical('no rules!')
return False
for idx, rule in enumerate(rules):
logger.info("""Push Rule
> index: {idx}
> name: {name}
> status: {status}
> language: {language}
> vid: {vid}""".format(
idx=idx,
name=rule['name']['en'],
status=rule['status'],
language=rule['language'],
vid=rule['vid'],
match=rule['match']
))
if rule['status'] is False:
logger.info('rule disabled, continue...')
continue
if rule['language'] in languages:
@staticmethod
def copy(source, destination):
if os.path.isfile(destination) is not True:
logger.info('Not set configuration, setting....')
with open(source) as f:
content = f.readlines()
with open(destination, 'w+') as f:
f.writelines(content)
logger.info('Config file set success(~/.cobra/{source})'.format(source=source))
else:
return
'.yml': 'config',
'.spf': 'config',
'.iml': 'config',
'.manifest': 'config',
# Source
'.psd': 'source',
'.as': 'source',
# Log
'.log': 'log',
# Template
'.template': 'template',
'.tpl': 'template',
}
for ext in files:
if ext in ext_language:
logger.info('{0} - {1}'.format(ext, files[ext]), False)
continue
else:
logger.info(ext, False)
explode_dirs = ['.svn', '.cvs', '.hg', '.git', '.bzr']
logger.info('**Rule Scan**\r\n > Global explode directory: `{dirs}`\r\n'.format(dirs=', '.join(explode_dirs)))
languages = CobraLanguages.query.all()
filter_group = (CobraRules.status == 1,)
if self.rule_id is not None:
filter_group = (CobraRules.id == self.rule_id,)
rules = CobraRules.query.filter(*filter_group).all()
extensions = None
find = tool.find
grep = tool.grep
"""
Vulnerability Types
Checkout to special branch.
:param branch: branch name
:return: True-checkout success or already on special branch
False-checkout failed. Maybe no branch name.
"""
if not self.__check_exist():
logger.info('No repo directory.')
return False
current_dir = os.getcwd()
os.chdir(self.repo_directory)
cmd = "git checkout " + branch
p = subprocess.Popen(cmd, shell=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
(checkout_out, checkout_err) = p.communicate()
logger.info(checkout_err)
# Already on
# did not match
# Switched to a new branch
if 'did not match' in checkout_err:
os.chdir(current_dir)
return False
else:
os.chdir(current_dir)
return True
def __init__(self):
rules = Config().rule()
for vn, vi in rules['vulnerabilities'].items():
# single vulnerability
logger.info('{vn} ({vn_description})'.format(vn=vn, vn_description=vi['name']))
for rule in vi['rules']:
# single vulnerability rule
logger.info(" > {vn}".format(vn=rule['name']))
logger.debug("""
Language: {language}
Match: {match}
Repair: {repair}""".format(
language=rule['language'],
match=rule['match'],
repair=rule['repair']
))
def pull(self):
"""Pull a repo from repo_address and repo_directory"""
logger.info('pull repository...')
if not self.__check_exist():
return False, 'No local repo exist. Please clone first.'
# change work directory to the repo
repo_dir = self.repo_directory
logger.debug('cd directory: {0}'.format(repo_dir))
os.chdir(repo_dir)
cmd = 'git pull origin master'
p = subprocess.Popen(cmd, shell=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
(pull_out, pull_err) = p.communicate()
logger.info(pull_out)
logger.info(pull_err)
self.parse_err(pull_err)