Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
i.setDaemon(daemonic=True)
i.start()
try:
global running_port, running_host
running_host = host if host != '0.0.0.0' else '127.0.0.1'
running_port = port
app.run(debug=debug, host=host, port=int(port), threaded=True, processes=1)
except socket.error as v:
if v.errno == errno.EACCES:
logger.critical('[{err}] must root permission for start API Server!'.format(err=v.strerror))
exit()
else:
logger.critical('{msg}'.format(msg=v.strerror))
logger.info('API Server start success')
'match2': None,
'match2-block': None,
'repair': None,
'repair-block': None,
'level': None,
'solution': None,
'test': {
'true': [],
'false': []
},
'status': False,
'author': None
}
xml_rule = self._read_xml(v_path)
if xml_rule is None:
logger.critical('rule read failed!!! ({file})'.format(file=v_path))
continue
cvi = v_path.lower().split('cvi-')[1][:6]
rule_info['id'] = cvi
for x in xml_rule:
if x.tag == 'name':
rule_info['name'] = x.get('value')
if x.tag == 'language':
rule_info['language'] = x.get('value').lower()
if x.tag == 'status':
rule_info['status'] = to_bool(x.get('value'))
if x.tag == 'author':
name = x.get('name').encode('utf-8')
email = x.get('email')
rule_info['author'] = '{name}<{email}>'.format(name=name, email=email)
if x.tag in ['match', 'match2', 'repair']:
if x.text is not None:
if os.path.isfile(absolute_path):
filename, directory = os.path.split(absolute_path)
self.file_info(directory, filename)
else:
for filename in os.listdir(absolute_path):
if self.is_pickup_whitelist(filename):
continue
else:
try:
directory = os.path.join(absolute_path, filename)
except UnicodeDecodeError as e:
logger.debug('Exception unicode {e}'.format(e=e))
continue
# Directory Structure
logger.debug('[PICKUP] [FILES] ' + '| ' * (level - 1) + '|--' + filename)
if os.path.isdir(directory):
self.files(directory, level + 1)
if os.path.isfile(directory):
self.file_info(directory, filename)
except OSError as e:
logger.critical('[PICKUP] {msg}'.format(msg=e))
exit()
def dependencies(self):
file_path, flag = self.find_file()
if flag == 0: # flag == 0
logger.debug('Dependency analysis cannot be done without finding dependency files')
return False
if flag == 1:
self.find_python_pip(file_path)
return True
if flag == 2:
self.find_java_mvn(file_path)
return True
if flag == 3:
self.find_nodejs_npm(file_path)
return True
att = MIMEText(open(capture_path, 'rb').read(), 'base64', 'utf-8')
att['Content-Type'] = 'application/octet-stream'
att["Content-Disposition"] = 'attachment; filename="W({0}).png"'.format(self.wd)
message.attach(att)
try:
smtp = smtplib.SMTP_SSL(host=self.host, port=self.port)
smtp.login(self.user, self.password)
smtp.sendmail(self.user, self.to, message.as_string())
logger.info('[EMAIL] Email delivered successfully.')
return True
except smtplib.SMTPRecipientsRefused:
logger.critical('[EMAIL] Email delivery rejected.')
return False
except smtplib.SMTPAuthenticationError:
logger.critical('[EMAIL] SMTP authentication error.')
return False
except smtplib.SMTPSenderRefused:
logger.critical('[EMAIL] SMTP sender refused.')
return False
except smtplib.SMTPException as error:
logger.critical(error)
logger.critical('[EMAIL] Please config SMTP Server, port, username, password and sender in config file')
return False
if root.tag != 'cobra':
frame_name = root.attrib['name']
language_name = root.attrib['language']
frame_data.setdefault(frame_name, [])
for child_of_root in root:
frame_data, language_data = self.parse_xml(child_of_root, frame_data, language_data, frame_name)
language_data.setdefault(language_name, {})
if frame_name is not None:
language_data[language_name].setdefault(frame_name, frame_data[frame_name])
return frame_data, language_data
else:
try:
frame_data[frame_name].append(root.attrib['value'])
return frame_data, language_data
except KeyError as e:
logger.warning(e.message)
# commit = u'{time}, @{author}'.format(author=x.commit_author, time=x.commit_time)
level = score2level(x.level)
cvi = x.id[0:3]
if cvi in vulnerabilities:
cvn = vulnerabilities[cvi]
else:
cvn = 'Unknown'
try:
code_content = x.code_content[:50].strip()
except AttributeError as e:
code_content = x.code_content.decode('utf-8')[:100].strip()
row = [idx + 1, x.id, x.rule_name, level, trigger, code_content]
data.append(row)
table.add_row(row)
if x.id not in trigger_rules:
logger.debug(' > trigger rule (CVI-{cvi})'.format(cvi=x.id))
trigger_rules.append(x.id)
diff_rules = list(set(push_rules) - set(trigger_rules))
vn = len(find_vulnerabilities)
if vn == 0:
logger.info('[SCAN] Not found vulnerability!')
else:
logger.info("[SCAN] Trigger Rules/Not Trigger Rules/Off Rules: {tr}/{ntr}/{fr} Vulnerabilities ({vn})\r\n{table}".format(tr=len(trigger_rules), ntr=len(diff_rules), fr=off_rules, vn=len(find_vulnerabilities), table=table))
if len(diff_rules) > 0:
logger.info('[SCAN] Not Trigger Rules ({l}): {r}'.format(l=len(diff_rules), r=','.join(diff_rules)))
if os.path.isfile(target_directory):
target_directory = os.path.dirname(target_directory)
# completed running data
if s_sid is not None:
Running(s_sid).data({
'code': 1001,
def push(self):
"""
Push data to API.
:return: push success or not
"""
try:
re = requests.post(url=self.api, data={"info": json.dumps(self.post_data, ensure_ascii=False)})
result = re.json()
if result.get("vul_pdf", "") != "":
logger.info('[PUSH API] Push success!')
return True
else:
logger.warning('[PUSH API] Push result error: {0}'.format(re.text))
return False
except (requests.ConnectionError, requests.HTTPError) as error:
logger.critical('[PUSH API] Network error: {0}'.format(str(error)))
return False
except ValueError as error:
logger.critical('[PUSH API] Response error: {0}'.format(str(error)))
return False
def un_gz(gz_files):
"""ungz zip file"""
start_time = datetime.datetime.now()
logger.info("Start decompress rule files, Please wait a moment....")
for gz_file in gz_files:
if os.path.exists(gz_file):
f_name = gz_file.replace(".gz", "")
try:
g_file = gzip.GzipFile(gz_file, "rb")
open(f_name, "wb+").write(g_file.read())
g_file.close()
except IOError:
logger.warning('[CVE] The {} download fail'.format(gz_file))
os.remove(gz_file)
end_time = datetime.datetime.now()
logger.info("Decompress success, use time:%ds" % (end_time - start_time).seconds)
return True
"""
message = MIMEMultipart()
message['From'] = self.user
message['To'] = self.to
message['Subject'] = self.subject
att = MIMEText(open(capture_path, 'rb').read(), 'base64', 'utf-8')
att['Content-Type'] = 'application/octet-stream'
att["Content-Disposition"] = 'attachment; filename="W({0}).png"'.format(self.wd)
message.attach(att)
try:
smtp = smtplib.SMTP_SSL(host=self.host, port=self.port)
smtp.login(self.user, self.password)
smtp.sendmail(self.user, self.to, message.as_string())
logger.info('[EMAIL] Email delivered successfully.')
return True
except smtplib.SMTPRecipientsRefused:
logger.critical('[EMAIL] Email delivery rejected.')
return False
except smtplib.SMTPAuthenticationError:
logger.critical('[EMAIL] SMTP authentication error.')
return False
except smtplib.SMTPSenderRefused:
logger.critical('[EMAIL] SMTP sender refused.')
return False
except smtplib.SMTPException as error:
logger.critical(error)
logger.critical('[EMAIL] Please config SMTP Server, port, username, password and sender in config file')
return False