Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# Validate the description
desc = request.form['description']
if len(desc) < 3 or len(desc) > 512:
flash('The description you chose was invalid. It must be between 3 and 512 characters long.', 'alert-danger')
return redirect(url_for('perms_roles'))
# Check if the class already exists
curd.execute("SELECT 1 FROM `p_roles` WHERE `name` = %s", (name,))
if curd.fetchone() is not None:
flash('A role already exists with that name', 'alert-danger')
return redirect(url_for('perms_roles'))
# SQL insert
curd.execute("INSERT INTO `p_roles` (`name`, `description`) VALUES (%s, %s)", (name, desc))
g.db.commit()
cortex.lib.core.log(__name__, "permissions.role.create", "Permission role '" + name + "' created")
flash("Role created", "alert-success")
return redirect(url_for('perms_roles'))
changes = 0
# Loop over the system permissions and reconcile with the DB
for perm in app.permissions.system_permissions:
# Check if the role already has this permission or not
curd.execute("SELECT `p_role_system_perms`.`perm_id` FROM `p_role_system_perms` JOIN `p_system_perms` ON `p_role_system_perms`.`perm_id`=`p_system_perms`.`id` WHERE `p_role_system_perms`.`role_id`=%s AND `p_role_system_perms`.`system_id`=%s AND `p_system_perms`.`perm`=%s", (role_id, system_id, perm["name"]))
row = curd.fetchone()
perm_id = row["perm_id"] if row is not None else None
should_exist = bool(perm["name"] in request.form and request.form[perm["name"]] == "yes")
if not should_exist and perm_id is not None:
changes += 1
curd.execute("DELETE FROM `p_role_system_perms` WHERE `role_id`=%s AND `system_id`=%s AND `perm_id`=%s", (role_id, system_id, perm_id))
g.db.commit()
cortex.lib.core.log(__name__, "permissions.role.system.revoke", "System permission {perm} revoked for role {role_id} on system {system_id}".format(perm=perm["name"], role_id=role_id, system_id=system_id))
elif should_exist and perm_id is None:
changes += 1
curd.execute("INSERT INTO `p_role_system_perms` (`role_id`, `perm_id`, `system_id`) VALUES (%s, (SELECT `id` FROM `p_system_perms` WHERE `perm`=%s), %s)", (role_id, perm["name"], system_id))
g.db.commit()
cortex.lib.core.log(__name__, "permissions.role.system.grant", "System permission {perm} granted for role {role_id} on system {system_id}.".format(perm=perm["name"], role_id=role_id, system_id=system_id))
if changes == 0:
flash("Permissions were not updated - no changes requested", "alert-warning")
else:
flash("Permissions for the system were successfully updated", "alert-success")
return redirect(url_for("perms_role", role_id=role_id, t="systems"))
## Add a environment to the role
elif action == "add_environment":
environment_id = request.form["environment_id"]
# Get the certificate
curd = g.db.cursor(mysql.cursors.DictCursor)
curd.execute('SELECT `subjectDN` FROM `certificate` WHERE `digest` = %s', (digest,))
certificate = curd.fetchone()
# If the certificate was not found then notify the user
if certificate is None:
raise Exception('Certificate does not exist')
# Delete the certificate
curd = g.db.cursor(mysql.cursors.DictCursor)
curd.execute('DELETE FROM `certificate` WHERE `digest` = %s', (digest,))
g.db.commit()
# Log which certificate was deleted
cortex.lib.core.log(__name__, "certificate.delete", "Certificate " + str(digest) + " (" + str(certificate['subjectDN']) + ") deleted")
# Notify user
flash('Certificate deleted', category='alert-success')
except Exception as e:
flash('Failed to delete certificate: ' + str(e), category='alert-danger')
return redirect(url_for('certificates'))
# Toggle notifications action
elif request.form['action'] == 'toggle_notify':
try:
# Get the certificate
curd = g.db.cursor(mysql.cursors.DictCursor)
curd.execute('SELECT `subjectDN` FROM `certificate` WHERE `digest` = %s', (digest,))
certificate = curd.fetchone()
# If the certificate was not found then notify the user
flash("The system was added to the role successfully", "alert-success")
return redirect(url_for("perms_role", role_id=role_id, t="systems"))
## Delete a system from the role
elif action == "remove_system":
system_id = request.form["system_id"]
if not re.match(r'^[0-9]+$',system_id):
flash("The system you sent was invalid", "alert-danger")
return redirect(url_for("perms_role", role_id=role_id, t="systems"))
else:
system_id = int(system_id)
curd.execute("DELETE FROM `p_role_system_perms` WHERE `role_id`=%s AND `system_id`=%s", (role_id, system_id))
g.db.commit()
cortex.lib.core.log(__name__, "permissions.role.system.purge", "System permissions purged for role {role_id} on system {system_id}.".format(role_id=role_id, system_id=system_id))
flash("The system has been removed from the role successfully", "alert-success")
return redirect(url_for("perms_role", role_id=role_id, t="systems"))
## Edit a systems permissions
elif action == "edit_system":
system_id = request.form["system_id"]
if not re.match(r'^[0-9]+$',system_id):
flash("The system you sent was invalid", "alert-danger")
return redirect(url_for("perms_role", role_id=role_id, t="systems"))
else:
system_id = int(system_id)
changes = 0
# Loop over the system permissions and reconcile with the DB
flash('That user/group is already added to the system, please select it from the list below and change permissions as required', 'alert-warning')
return redirect(url_for('perms_system', system_id=system_id))
changes = 0
## Now loop over the per-system permissions available to us
for perm in app.permissions.system_permissions:
## If the form has the checkbox for this perm checked...
if perm['name'] in request.form:
if request.form[perm['name']] == 'yes':
## Insert the permission for this name/type/perm combo
changes = changes + 1
curd.execute("INSERT INTO `p_system_perms_who` (`system_id`, `who`, `type`, `perm_id`) VALUES (%s, %s, %s, (SELECT `id` FROM `p_system_perms` WHERE `perm`=%s))", (system_id, name, wtype, perm['name']))
g.db.commit()
if wtype == 0:
cortex.lib.core.log(__name__, "permissions.system.grant.user", "System permission '" + perm['name'] + "' granted for user '" + name + "' on system " + str(system_id))
else:
cortex.lib.core.log(__name__, "permissions.system.grant.group", "System permission '" + perm['name'] + "' granted for group '" + name + "' on system " + str(system_id))
if changes == 0:
flash("The " + hstr + " " + name + " was not added because no permissions were selected", "alert-danger")
else:
flash("The " + hstr + " " + name + " was added to the system", "alert-success")
return redirect(url_for('perms_system', system_id=system_id))
elif action == 'remove':
name = request.form['name']
if not re.match(r'^[a-zA-Z0-9\-\_&]{3,255}$', name):
flash("The user or group name you sent was invalid", "alert-danger")
return redirect(url_for('perms_system', system_id=system_id))
wtype = request.form['type']
# If we're linking to ServiceNow
if 'link_servicenow' in request.form:
# Search for a CI with the correct name
curd.execute("SELECT `sys_id` FROM `sncache_cmdb_ci` WHERE `name` = %s", (hostname,))
ci_results = curd.fetchall()
if len(ci_results) == 0:
flash("System not linked to ServiceNow: Couldn't find a CI to link the system to", "alert-warning")
elif len(ci_results) > 1:
flash("System not linked to ServiceNow: Found more than one CI matching the name", "alert-warning")
else:
curd.execute("UPDATE `systems` SET `cmdb_id` = %s WHERE `id` = %s", (ci_results[0]['sys_id'], system_id))
g.db.commit()
cortex.lib.core.log(__name__, "systems.add.existing", "System manually added, id " + str(system_id),related_id=system_id)
# Redirect to the system page for the system we just added
flash("System added", "alert-success")
return redirect(url_for('system', id=system_id))
def systems_download_csv():
"""Downloads the list of allocated server names as a CSV file."""
# Check user permissions
if not does_user_have_permission("systems.all.view"):
abort(403)
# Get the list of systems
curd = cortex.lib.systems.get_systems(return_cursor=True, hide_inactive=False)
cortex.lib.core.log(__name__, "systems.csv.download", "CSV of systems downloaded")
# Return the response
return Response(cortex.lib.systems.csv_stream(curd), mimetype="text/csv", headers={'Content-Disposition': 'attachment; filename="systems.csv"'})
app.logger.info('Created Puppet ENC entry for certname "' + fqdn + '"')
# Get the satellite registration key (if any)
if satellite_required:
if ident in app.config['SATELLITE_KEYS']:
data = app.config['SATELLITE_KEYS'][ident]
app.logger.warn("TESTING: USING ENV {} with data {}".format(cdata['environment'], data))
if cdata['environment'] in data:
cdata['satellite_activation_key'] = data[cdata['environment']]
else:
app.logger.warn('No Satellite activation key configured for OS ident, ' + str(ident) + ' with environment ' + cdata['environment'] + ' - a Satellite activation key will not be returned')
else:
app.logger.warn('No Satellite activation keys configured for OS ident (' + str(ident) + ') - a Satellite activation key will not be returned')
if interactive:
cortex.lib.core.log(__name__, "api.register.system", "New system '" + fqdn + "' registered via the API by " + request.form['username'], username=request.form['username'])
else:
cortex.lib.core.log(__name__, "api.register.system", "New system '" + fqdn + "' registered via the API by VM-UUID authentication")
return jsonify(cdata)
# Check permissions before starting task
if not does_user_have_system_permission(values["adddisk_system_id"], "adddisk") and not does_user_have_workflow_permission("systems.all.adddisk"):
abort(403)
# Task Options
options = {}
options["wfconfig"] = workflow.config
options["values"] = values
# Everything should be good - start a task.
neocortex = cortex.lib.core.neocortex_connect()
task_id = neocortex.create_task(__name__, session["username"], options, description="Add VMware disk")
# Log the Task ID
cortex.lib.core.log(__name__, "workflow.adddisk.add", "Add disk task {} started by {} with ServiceNow task {}".format(task_id, session["username"], values["adddisk_task"]))
# Redirect to the status page for the task
return redirect(url_for("task_status", id=task_id))
return workflow.render_template("add.html", title="Add VMware Disk", selected_system = selected_system, systems = systems)
def certificates_download_csv():
"""Downloads the list of certificates as a CSV file."""
# Check user permissions
if not does_user_have_permission("certificates.view"):
abort(403)
# Get the list of systems
curd = g.db.cursor(mysql.cursors.DictCursor)
curd.execute('SELECT `certificate`.`digest` AS `digest`, `certificate`.`subjectCN` AS `subjectCN`, `certificate`.`subjectDN` AS `subjectDN`, `certificate`.`issuerCN` AS `issuerCN`, `certificate`.`issuerDN` AS `issuerDN`, `certificate`.`notBefore` AS `notBefore`, `certificate`.`notAfter` AS `notAfter`, MAX(`scan_result`.`when`) AS `lastSeen`, COUNT(DISTINCT `scan_result`.`host`) AS `numHosts`, (SELECT GROUP_CONCAT(`san`) FROM `certificate_sans` WHERE `cert_digest` = `certificate`.`digest`) AS `sans`, `certificate`.`notes` AS `notes`, `certificate`.`keySize` AS `keySize` FROM `certificate` LEFT JOIN `scan_result` ON `certificate`.`digest` = `scan_result`.`cert_digest` GROUP BY `certificate`.`digest`;')
cortex.lib.core.log(__name__, "certificates.csv.download", "CSV of certificates downloaded")
# Return the response
return Response(certificates_download_csv_stream(curd), mimetype="text/csv", headers={'Content-Disposition': 'attachment; filename="certificates.csv"'})