Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@app.route('/systems/view/')
@cortex.lib.user.login_required
def system(id):
# Check user permissions. User must have either systems.all or specific
# access to the system
if not does_user_have_system_permission(id,"view","systems.all.view"):
abort(403)
# Get the system
system = cortex.lib.systems.get_system_by_id(id)
# Ensure that the system actually exists, and return a 404 if it doesn't
if system is None:
abort(404)
system_class = cortex.lib.classes.get(system['class'])
system['review_status_text'] = cortex.lib.systems.REVIEW_STATUS_BY_ID[system['review_status']]
if system['puppet_certname']:
system['puppet_node_status'] = cortex.lib.puppet.puppetdb_get_node_status(system['puppet_certname'])
if system['allocation_who_realname'] is not None:
vmware_uuid = None
else:
if not re.match('^[0-9a-f]{8}-([0-9a-f]{4}-){3}[0-9a-f]{12}$', vmware_uuid.lower()):
raise ValueError()
else:
vmware_uuid = system['vmware_uuid']
if does_user_have_system_permission(id,"edit.rubrik","systems.all.edit.rubrik"):
enable_backup = request.form.get('enable_backup', 2)
enable_backup_scripts = request.form.get('enable_backup_scripts', 2)
else:
enable_backup = system['enable_backup']
enable_backup_scripts = system['enable_backup_scripts']
# Process the expiry date
if does_user_have_system_permission(id,"edit.expiry","systems.all.edit.expiry"):
if 'expiry_date' in request.form and request.form['expiry_date'] is not None and len(request.form['expiry_date'].strip()) > 0:
expiry_date = request.form['expiry_date']
try:
expiry_date = datetime.datetime.strptime(expiry_date, '%Y-%m-%d')
except Exception:
abort(400)
else:
expiry_date = None
else:
expiry_date = system['expiry_date']
# Extract Review Status from form
if does_user_have_system_permission(id,"edit.review","systems.all.edit.review"):
review_status = int(request.form.get('review_status', 0))
if not review_status in cortex.lib.systems.REVIEW_STATUS_BY_ID:
raise ValueError()
# Ensure that the system actually exists, and return a 404 if it doesn't
if system is None:
abort(404)
# Get the list of actions we can perform
actions = []
for action in app.wf_system_functions:
if action['menu']:
## Add to menu ONLY if:
### they have workflows.all
### they have the per-system permission set in the workflow action
### they have the global permission set in the workflow action
if does_user_have_permission("workflows.all"):
actions.append(action)
elif does_user_have_system_permission(id,action['system_permission']):
app.logger.debug("User " + session['username'] + " does not have workflows.all")
actions.append(action)
elif action['permission'] is not None:
app.logger.debug("User " + session['username'] + " does not have " + action['system_permission'])
if does_user_have_permission("workflows." + action['permission']):
actions.append(action)
else:
app.logger.debug("User " + session['username'] + " does not have " + action['permission'])
return render_template('systems/actions.html', system=system, active='systems', actions=actions, title=system['name'])
curd = g.db.cursor(mysql.cursors.DictCursor)
# TODO: Query with an order so 'production' take precedence
curd.execute("SELECT `puppet_modules`.`module_name` AS `module_name`, `puppet_classes`.`class_name` AS `class_name`, `puppet_documentation`.`name` AS `param`, `puppet_documentation`.`text` AS `param_desc` FROM `puppet_modules` LEFT JOIN `puppet_classes` ON `puppet_modules`.`id`=`puppet_classes`.`module_id` LEFT JOIN `puppet_documentation` ON `puppet_classes`.`id`=`puppet_documentation`.`class_id` WHERE `puppet_documentation`.`tag`=%s;", ("param", ))
hints = {}
for row in curd.fetchall():
if row["module_name"] not in hints:
hints[row["module_name"]] = {}
if row["class_name"] not in hints[row["module_name"]]:
hints[row["module_name"]][row["class_name"]] = {}
if row["param"] not in hints[row["module_name"]][row["class_name"]]:
hints[row["module_name"]][row["class_name"]][row["param"]] = row["param_desc"]
# On any GET request, just display the information
if request.method == 'GET':
# If the user has view or edit permission send them the template - otherwise abort with 403.
if does_user_have_system_permission(system['id'], "view.puppet.classify", "systems.all.view.puppet.classify") or \
does_user_have_system_permission(system['id'], "edit.puppet"," systems.all.edit.puppet"):
return render_template('puppet/enc.html', system=system, active='puppet', environments=environments, title=system['name'], nodename=node, pactive="edit", yaml=cortex.lib.puppet.generate_node_config(system['puppet_certname']), hints=hints, environment_names=environment_names)
else:
abort(403)
# If the method is POST and the user has edit permission.
# Validate the input and then save.
elif request.method == 'POST' and does_user_have_system_permission(system['id'],"edit.puppet","systems.all.edit.puppet"):
# Extract data from form
environment = request.form.get('environment', '')
classes = request.form.get('classes', '')
variables = request.form.get('variables', '')
if 'include_default' in request.form:
include_default = True
else:
def puppet_catalog(node):
"""Show the Puppet catalog for a given node."""
# Get the system
system = cortex.lib.systems.get_system_by_puppet_certname(node)
if system == None:
abort(404)
## Check if the user is allowed to edit the Puppet configuration
if not does_user_have_system_permission(system['id'],"view.puppet.catalog","systems.all.view.puppet.catalog"):
abort(403)
dbnode = None
catalog = None
try:
# Connect to PuppetDB, get the node information and then it's catalog.
db = cortex.lib.puppet.puppetdb_connect()
dbnode = db.node(node)
catalog = db.catalog(node)
except HTTPError as he:
# If we get a 404 from the PuppetDB API
if he.response.status_code == 404:
catalog = None
else:
raise(he)
except Exception as e:
# Extract CMDB ID from form
if does_user_have_system_permission(id,"edit.cmdb","systems.all.edit.cmdb"):
cmdb_id = request.form.get('cmdb_id',None)
if cmdb_id is not None:
cmdb_id = cmdb_id.strip()
if len(cmdb_id) == 0:
cmdb_id = None
else:
if not re.match('^[0-9a-f]+$', cmdb_id.lower()):
raise ValueError()
else:
cmdb_id = system['cmdb_id']
# Extract VMware UUID from form
if does_user_have_system_permission(id,"edit.vmware","systems.all.edit.vmware"):
vmware_uuid = request.form.get('vmware_uuid',None)
if vmware_uuid is not None:
vmware_uuid = vmware_uuid.strip()
if len(vmware_uuid) == 0:
vmware_uuid = None
else:
if not re.match('^[0-9a-f]{8}-([0-9a-f]{4}-){3}[0-9a-f]{12}$', vmware_uuid.lower()):
raise ValueError()
else:
vmware_uuid = system['vmware_uuid']
if does_user_have_system_permission(id,"edit.rubrik","systems.all.edit.rubrik"):
enable_backup = request.form.get('enable_backup', 2)
enable_backup_scripts = request.form.get('enable_backup_scripts', 2)
else:
enable_backup = system['enable_backup']
def system_power(id):
# Check user permissions. User must have either systems.all or specific
# access to the system
if not does_user_have_system_permission(id,"control.vmware.power", "control.all.vmware.power"):
abort(403)
# Get the system
system = cortex.lib.systems.get_system_by_id(id)
# Ensure that the system actually exists, and return a 404 if it doesn't
if system is None:
abort(404)
try:
if request.form.get('power_action', None) == "on":
cortex.lib.systems.power_on(id)
elif request.form.get('power_action', None) == "shutdown":
cortex.lib.systems.shutdown(id)
elif request.form.get('power_action', None) == "off":
cortex.lib.systems.power_off(id)
def system(id):
# Check user permissions. User must have either systems.all or specific
# access to the system
if not does_user_have_system_permission(id,"view.detail","systems.all.view"):
abort(403)
# Get the system
system = cortex.lib.systems.get_system_by_id(id)
# Ensure that the system actually exists, and return a 404 if it doesn't
if system is None:
abort(404)
system_class = cortex.lib.classes.get(system['class'])
system['review_status_text'] = cortex.lib.systems.REVIEW_STATUS_BY_ID[system['review_status']]
if system['puppet_certname']:
try:
system['puppet_node_status'] = cortex.lib.puppet.puppetdb_get_node_status(system['puppet_certname'])
except Exception:
@app.route('/systems/actions/', methods=['GET', 'POST'])
@cortex.lib.user.login_required
def system_actions(id):
if not does_user_have_system_permission(id,"view","systems.all.view"):
abort(403)
# Get the system
system = cortex.lib.systems.get_system_by_id(id)
# Ensure that the system actually exists, and return a 404 if it doesn't
if system is None:
abort(404)
# Get the list of actions we can perform
actions = []
for action in app.wf_system_functions:
if action['menu']:
## Add to menu ONLY if:
### they have workflows.all
### they have the per-system permission set in the workflow action
def get(self, system_name):
"""
Returns a single system from systems_info_view, searching by name.
"""
system = cortex.lib.systems.get_system_by_name(system_name)
if not system:
raise NoResultsFoundException
if not does_user_have_system_permission(system['id'], "view.detail", "systems.all.view"):
raise InvalidPermissionException
return system