Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@app.route('/systems/edit/', methods=['GET', 'POST'])
@cortex.lib.user.login_required
def systems_edit(id):
if request.method == 'GET' or request.method == 'HEAD':
# Get the system out of the database
system = cortex.lib.systems.get_system_by_id(id)
system_class = cortex.lib.classes.get(system['class'])
return render_template('systems-edit.html', system=system, system_class=system_class, active='systems', title=system['name'])
elif request.method == 'POST':
try:
# Get a cursor to the database
cur = g.db.cursor(mysql.cursors.DictCursor)
# Update the system
cur.execute('UPDATE `systems` SET `allocation_comment` = %s, `cmdb_id` = %s, `vmware_uuid` = %s WHERE `id` = %s', (request.form['allocation_comment'], request.form['cmdb_id'], request.form['vmware_uuid'], id))
g.db.commit();
self.svgfile = svgfile
self.callback = callback
self.kdt = cKDTree(tcoords)
self.layer = layer
# Display parameters
if layer in config.sections():
dlayer = layer
else:
# Unknown display layer; default to values for ROIs
import warnings
warnings.warn('No defaults set for display layer %s; Using defaults for ROIs in options.cfg file'%layer)
dlayer = 'rois'
self.linewidth = float(config.get(dlayer, "line_width")) if linewidth is None else linewidth
self.linecolor = tuple(map(float, config.get(dlayer, "line_color").split(','))) if linecolor is None else linecolor
self.roifill = tuple(map(float, config.get(dlayer, "fill_color").split(','))) if roifill is None else roifill
self.shadow = float(config.get(dlayer, "shadow")) if shadow is None else shadow
# For dashed lines, default to WYSIWYG from rois.svg
self.dashtype = dashtype
self.dashoffset = dashoffset
self.reload(size=labelsize, color=labelcolor)
import os
import six
import shlex
import xdrlib
import tempfile
import subprocess as sp
import numpy as np
from .. import options
from .. import freesurfer
from .. import dataset
from .. import utils
default_blender = options.config.get('dependency_paths', 'blender')
_base_imports = """import sys
sys.path.insert(0, '{path}')
import xdrlib
import blendlib
import bpy.ops
from bpy import context as C
from bpy import data as D
""".format(path=os.path.split(os.path.abspath(__file__))[0])
def _call_blender(filename, code, blender_path=default_blender):
"""Call blender, while running the given code. If the filename doesn't exist, save a new file in that location.
New files will be initially cleared by deleting all objects.
"""
with tempfile.NamedTemporaryFile() as tf:
print("In new named temp file: %s"%tf.name)
# Recursive call for multiple layers
if self.layer == 'multi_layer':
label_layers = []
for L in self.layer_names:
label_layers.append(self.layers[L].setup_labels())
self.svg.getroot().insert(0, label_layers[-1])
return label_layers
if self.layer in config.sections():
dlayer = self.layer
else:
# Unknown display layer; default to values for ROIs
import warnings
warnings.warn('No defaults set for display layer %s; Using defaults for ROIs in options.cfg file'%self.layer)
dlayer = 'rois'
if size is None:
size = config.get(dlayer, "labelsize")
if color is None:
color = tuple(map(float, config.get(dlayer, "labelcolor").split(",")))
if shadow is None:
shadow = self.shadow
alpha = color[3]
color = "rgb(%d, %d, %d)"%(color[0]*255, color[1]*255, color[2]*255)
try:
layer = _find_layer(self.svg, "%s_labels"%self.layer)
except ValueError: # Changed in _find_layer below... AssertionError: # Why assertion error?
layer = _make_layer(self.svg.getroot(), "%s_labels"%self.layer)
labelpos, candidates = [], []
for roi in list(self.rois.values()):
for i, pos in enumerate(roi.get_labelpos()):
# Validate the description
desc = request.form['description']
if len(desc) < 3 or len(desc) > 512:
flash('The description you chose was invalid. It must be between 3 and 512 characters long.', 'alert-danger')
return redirect(url_for('perms_roles'))
# Check if the class already exists
curd.execute("SELECT 1 FROM `p_roles` WHERE `name` = %s", (name,))
if curd.fetchone() is not None:
flash('A role already exists with that name', 'alert-danger')
return redirect(url_for('perms_roles'))
# SQL insert
curd.execute("INSERT INTO `p_roles` (`name`, `description`) VALUES (%s, %s)", (name, desc))
g.db.commit()
cortex.lib.core.log(__name__, "permissions.role.create", "Permission role '" + name + "' created")
flash("Role created", "alert-success")
return redirect(url_for('perms_roles'))
changes = 0
# Loop over the system permissions and reconcile with the DB
for perm in app.permissions.system_permissions:
# Check if the role already has this permission or not
curd.execute("SELECT `p_role_system_perms`.`perm_id` FROM `p_role_system_perms` JOIN `p_system_perms` ON `p_role_system_perms`.`perm_id`=`p_system_perms`.`id` WHERE `p_role_system_perms`.`role_id`=%s AND `p_role_system_perms`.`system_id`=%s AND `p_system_perms`.`perm`=%s", (role_id, system_id, perm["name"]))
row = curd.fetchone()
perm_id = row["perm_id"] if row is not None else None
should_exist = bool(perm["name"] in request.form and request.form[perm["name"]] == "yes")
if not should_exist and perm_id is not None:
changes += 1
curd.execute("DELETE FROM `p_role_system_perms` WHERE `role_id`=%s AND `system_id`=%s AND `perm_id`=%s", (role_id, system_id, perm_id))
g.db.commit()
cortex.lib.core.log(__name__, "permissions.role.system.revoke", "System permission {perm} revoked for role {role_id} on system {system_id}".format(perm=perm["name"], role_id=role_id, system_id=system_id))
elif should_exist and perm_id is None:
changes += 1
curd.execute("INSERT INTO `p_role_system_perms` (`role_id`, `perm_id`, `system_id`) VALUES (%s, (SELECT `id` FROM `p_system_perms` WHERE `perm`=%s), %s)", (role_id, perm["name"], system_id))
g.db.commit()
cortex.lib.core.log(__name__, "permissions.role.system.grant", "System permission {perm} granted for role {role_id} on system {system_id}.".format(perm=perm["name"], role_id=role_id, system_id=system_id))
if changes == 0:
flash("Permissions were not updated - no changes requested", "alert-warning")
else:
flash("Permissions for the system were successfully updated", "alert-success")
return redirect(url_for("perms_role", role_id=role_id, t="systems"))
## Add a environment to the role
elif action == "add_environment":
environment_id = request.form["environment_id"]
# Get the certificate
curd = g.db.cursor(mysql.cursors.DictCursor)
curd.execute('SELECT `subjectDN` FROM `certificate` WHERE `digest` = %s', (digest,))
certificate = curd.fetchone()
# If the certificate was not found then notify the user
if certificate is None:
raise Exception('Certificate does not exist')
# Delete the certificate
curd = g.db.cursor(mysql.cursors.DictCursor)
curd.execute('DELETE FROM `certificate` WHERE `digest` = %s', (digest,))
g.db.commit()
# Log which certificate was deleted
cortex.lib.core.log(__name__, "certificate.delete", "Certificate " + str(digest) + " (" + str(certificate['subjectDN']) + ") deleted")
# Notify user
flash('Certificate deleted', category='alert-success')
except Exception as e:
flash('Failed to delete certificate: ' + str(e), category='alert-danger')
return redirect(url_for('certificates'))
# Toggle notifications action
elif request.form['action'] == 'toggle_notify':
try:
# Get the certificate
curd = g.db.cursor(mysql.cursors.DictCursor)
curd.execute('SELECT `subjectDN` FROM `certificate` WHERE `digest` = %s', (digest,))
certificate = curd.fetchone()
# If the certificate was not found then notify the user
flash("The system was added to the role successfully", "alert-success")
return redirect(url_for("perms_role", role_id=role_id, t="systems"))
## Delete a system from the role
elif action == "remove_system":
system_id = request.form["system_id"]
if not re.match(r'^[0-9]+$',system_id):
flash("The system you sent was invalid", "alert-danger")
return redirect(url_for("perms_role", role_id=role_id, t="systems"))
else:
system_id = int(system_id)
curd.execute("DELETE FROM `p_role_system_perms` WHERE `role_id`=%s AND `system_id`=%s", (role_id, system_id))
g.db.commit()
cortex.lib.core.log(__name__, "permissions.role.system.purge", "System permissions purged for role {role_id} on system {system_id}.".format(role_id=role_id, system_id=system_id))
flash("The system has been removed from the role successfully", "alert-success")
return redirect(url_for("perms_role", role_id=role_id, t="systems"))
## Edit a systems permissions
elif action == "edit_system":
system_id = request.form["system_id"]
if not re.match(r'^[0-9]+$',system_id):
flash("The system you sent was invalid", "alert-danger")
return redirect(url_for("perms_role", role_id=role_id, t="systems"))
else:
system_id = int(system_id)
changes = 0
# Loop over the system permissions and reconcile with the DB
flash('That user/group is already added to the system, please select it from the list below and change permissions as required', 'alert-warning')
return redirect(url_for('perms_system', system_id=system_id))
changes = 0
## Now loop over the per-system permissions available to us
for perm in app.permissions.system_permissions:
## If the form has the checkbox for this perm checked...
if perm['name'] in request.form:
if request.form[perm['name']] == 'yes':
## Insert the permission for this name/type/perm combo
changes = changes + 1
curd.execute("INSERT INTO `p_system_perms_who` (`system_id`, `who`, `type`, `perm_id`) VALUES (%s, %s, %s, (SELECT `id` FROM `p_system_perms` WHERE `perm`=%s))", (system_id, name, wtype, perm['name']))
g.db.commit()
if wtype == 0:
cortex.lib.core.log(__name__, "permissions.system.grant.user", "System permission '" + perm['name'] + "' granted for user '" + name + "' on system " + str(system_id))
else:
cortex.lib.core.log(__name__, "permissions.system.grant.group", "System permission '" + perm['name'] + "' granted for group '" + name + "' on system " + str(system_id))
if changes == 0:
flash("The " + hstr + " " + name + " was not added because no permissions were selected", "alert-danger")
else:
flash("The " + hstr + " " + name + " was added to the system", "alert-success")
return redirect(url_for('perms_system', system_id=system_id))
elif action == 'remove':
name = request.form['name']
if not re.match(r'^[a-zA-Z0-9\-\_&]{3,255}$', name):
flash("The user or group name you sent was invalid", "alert-danger")
return redirect(url_for('perms_system', system_id=system_id))
wtype = request.form['type']
# If we're linking to ServiceNow
if 'link_servicenow' in request.form:
# Search for a CI with the correct name
curd.execute("SELECT `sys_id` FROM `sncache_cmdb_ci` WHERE `name` = %s", (hostname,))
ci_results = curd.fetchall()
if len(ci_results) == 0:
flash("System not linked to ServiceNow: Couldn't find a CI to link the system to", "alert-warning")
elif len(ci_results) > 1:
flash("System not linked to ServiceNow: Found more than one CI matching the name", "alert-warning")
else:
curd.execute("UPDATE `systems` SET `cmdb_id` = %s WHERE `id` = %s", (ci_results[0]['sys_id'], system_id))
g.db.commit()
cortex.lib.core.log(__name__, "systems.add.existing", "System manually added, id " + str(system_id),related_id=system_id)
# Redirect to the system page for the system we just added
flash("System added", "alert-success")
return redirect(url_for('system', id=system_id))