Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def perms_roles():
"""View function to let administrators view and manage the list of roles"""
# Check user permissions
if not does_user_have_permission("admin.permissions"):
abort(403)
# Cursor for the DB
curd = g.db.cursor(mysql.cursors.DictCursor)
## View list
if request.method == 'GET':
# Get the list of roles from the database
roles = cortex.lib.perms.get_roles()
# Render the page
return render_template('perms/roles.html', active='perms', title="Roles", roles=roles, manage_role_route='perms_role')
## Create new role
elif request.method == 'POST':
# Validate class name/prefix
name = request.form['name']
if len(name) < 3 or len(name) > 64:
flash('The name you chose is invalid. It must be between 3 and 64 characters long.', 'alert-danger')
return redirect(url_for('perms_roles'))
# Validate the description
desc = request.form['description']
if len(desc) < 3 or len(desc) > 512:
@app.route('/systems/withperms')
@cortex.lib.user.login_required
def systems_withperms():
"""Shows the list of systems which have permissions"""
# Check user permissions
if not does_user_have_permission("systems.all.view"):
abort(403)
# Get the list of active classes (used to populate the tab bar)
classes = cortex.lib.classes.list()
# Render
return render_template('systems/list.html', classes=classes, active='perms', title="Systems with permissions", perms_only=True)
curd = g.db.cursor(mysql.cursors.DictCursor)
## Get from the cache (if it hasn't expired)
curd.execute('SELECT 1 FROM `ldap_group_cache_expire` WHERE `username` = %s AND `expiry_date` > NOW()', (username,))
if curd.fetchone() is not None:
## The cache has not expired, return the list
curd.execute('SELECT `group` FROM `ldap_group_cache` WHERE `username` = %s ORDER BY `group`', (username,))
groupdict = curd.fetchall()
groups = []
for group in groupdict:
groups.append(group['group'])
return groups
## The cache has expired, return them from LDAP directly (but also cache)
return cortex.lib.ldapc.get_users_groups_from_ldap(username)
@cortex.lib.user.login_required
def task_status_log(id):
"""Much like task_status, but only returns the event log. This is used by
an AJAX routine on the page to refresh the log every 10 seconds."""
## Get the task details
task = cortex.lib.core.task_get(id)
# Return a 404 if we've not found the task
if not task:
abort(404)
# Check the user has the permission to view this task
if not task['username'] == session['username']:
if not does_user_have_permission("tasks.view"):
abort(403)
def system_view(system_id):
"""Tenable.io Asset Information - from node/system tab"""
# Check user permissions
if not (cortex.lib.user.does_user_have_system_permission(system_id, "view.detail", "systems.all.view") and cortex.lib.user.does_user_have_permission("tenable.view")):
abort(403)
# Get the system
system = cortex.lib.systems.get_system_by_id(system_id)
# Ensure that the system actually exists, and return a 404 if it doesn't
if system is None:
abort(404)
return render_template("tenable/system_view.html", title=system["name"], system=system)
@cortex.lib.user.login_required
def get(self, id):
# Check user permissions. User must have either systems.all or specific
# access to the system
if not does_user_have_system_permission(id,"view","systems.all.view"):
abort(403)
# Get the name of the vm
system = cortex.lib.systems.get_system_by_id(id)
if not system:
abort(404)
try:
vm = self.rubrik.get_vm(system['name'])
except:
raise
abort(500)
sla_domains = self.rubrik.get_sla_domains()
vm['effectiveSlaDomain'] = next((sla_domain for sla_domain in
sla_domains['data'] if sla_domain['id'] ==
vm['effectiveSlaDomainId']))
vm['snapshots'] = self.rubrik.get_vm_snapshots(vm['id'])
@cortex.lib.user.login_required
def dsc_classify_machine(id):
#ADD in test to see if the dsc machine is responding
system = cortex.lib.systems.get_system_by_id(id)
if system == None:
abort(404)
curd = g.db.cursor(mysql.cursors.DictCursor)
# get a proxy to connect to dsc
def get(self):
"""
Returns a paginated list of rows from the tasks lists.
"""
page, per_page, limit_start, limit_length = process_pagination_arguments(request)
if not does_user_have_permission("tasks.view"):
raise InvalidPermissionException
tasks_args = tasks_arguments.parse_args(request)
username = tasks_args.get('username', None)
module = tasks_args.get('module', None)
status = tasks_args.get('status', None)
total = cortex.lib.core.tasks_count(username=username, module=module, status=status)
results = cortex.lib.core.tasks_get(username=username, module=module, status=status, order='id', limit_start=limit_start, limit_length=limit_length)
return pagination_response(results, page, per_page, total)
if not (MIN_DISK_SIZE <= values["adddisk_size"] <= MAX_DISK_SIZE):
flash("Invalid disk size! Please choose a size between {} and {} GiB".format(MIN_DISK_SIZE, MAX_DISK_SIZE))
else:
# Check permissions before starting task
if not does_user_have_system_permission(values["adddisk_system_id"], "adddisk") and not does_user_have_workflow_permission("systems.all.adddisk"):
abort(403)
# Task Options
options = {}
options["wfconfig"] = workflow.config
options["values"] = values
# Everything should be good - start a task.
neocortex = cortex.lib.core.neocortex_connect()
task_id = neocortex.create_task(__name__, session["username"], options, description="Add VMware disk")
# Log the Task ID
cortex.lib.core.log(__name__, "workflow.adddisk.add", "Add disk task {} started by {} with ServiceNow task {}".format(task_id, session["username"], values["adddisk_task"]))
# Redirect to the status page for the task
return redirect(url_for("task_status", id=task_id))
return workflow.render_template("add.html", title="Add VMware Disk", selected_system = selected_system, systems = systems)
@cortex.lib.user.login_required
def puppet_help():
"""Displays the Puppet ENC help page."""
return render_template('puppet/help.html', active='puppet', title="Puppet Help")