Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@cortex.lib.user.login_required
def perms_system(system_id):
"""View function to let administrators view and manage a role"""
# Check user permissions
if not does_user_have_permission("admin.permissions"):
abort(403)
# Get the system
system = cortex.lib.systems.get_system_by_id(system_id)
# Ensure that the system actually exists, and return a 404 if it doesn't
if system is None:
abort(404)
# Cursor for the DB
curd = g.db.cursor(mysql.cursors.DictCursor)
@cortex.lib.user.login_required
def puppet_reports(node):
"""Handles the Puppet reports page for a node"""
# Get the system (we need to know the ID for permissions checking)
system = cortex.lib.systems.get_system_by_puppet_certname(node)
if system is None:
abort(404)
## Check if the user is allowed to view the reports of this node
if not does_user_have_system_permission(system['id'],"view.puppet","systems.all.view.puppet"):
abort(403)
try:
# Connect to PuppetDB and get the reports
db = cortex.lib.puppet.puppetdb_connect()
reports = db.node(node).reports()
def root():
# If the user is already logged in, just redirect them to their dashboard
if cortex.lib.user.is_logged_in():
return redirect(url_for('dashboard'))
else:
if app.config['DEFAULT_USER_AUTH'] == 'cas':
return cas()
else:
return login()
@cortex.lib.user.login_required
def vmware_download_csv():
"""Downloads the VMware data as a CSV file."""
# Check user permissions
if not does_user_have_permission("vmware.view"):
abort(403)
# Get the list of systems
curd = g.db.cursor(mysql.cursors.DictCursor)
curd.execute('SELECT * FROM `vmware_cache_vm` ORDER BY `name`')
cortex.lib.core.log(__name__, "vmware.csv.download", "CSV of vmware data downloaded")
# Return the response
return Response(vmware_csv_stream(curd), mimetype="text/csv", headers={'Content-Disposition': 'attachment; filename="vmware.csv"'})
def login():
if request.method == 'POST':
if all(field in request.form for field in ['username', 'password']):
result = cortex.lib.user.authenticate(request.form['username'], request.form['password'])
if not result:
flash('Incorrect username and/or password', 'alert-danger')
# Do we want this? Could fill up the database volume (DoS)
#cortex.lib.core.log(__name__, 'Login failure: incorrect username/password', request.form['username'].lower())
return redirect(url_for('login'))
# Permanent sessions
permanent = request.form.get('sec', default="")
# Set session as permanent or not
if permanent == 'sec':
session.permanent = True
else:
session.permanent = False
# Logon is OK to proceed
@cortex.lib.user.login_required
def system_power(id):
# Check user permissions. User must have either systems.all or specific
# access to the system
if not does_user_have_system_permission(id,"control.vmware.power", "control.all.vmware.power"):
abort(403)
# Get the system
system = cortex.lib.systems.get_system_by_id(id)
# Ensure that the system actually exists, and return a 404 if it doesn't
if system is None:
abort(404)
try:
if request.form.get('power_action', None) == "on":
cortex.lib.systems.power_on(id)
@cortex.lib.user.login_required
def dashboard():
"""This renders the front page after the user logged in."""
# Get a cursor to the database
curd = g.db.cursor(mysql.cursors.DictCursor)
# Get number of VMs
curd.execute('SELECT COUNT(*) AS `count` FROM `vmware_cache_vm` WHERE `template` = 0');
row = curd.fetchone()
vm_count = row['count']
# Get number of CIs
curd.execute('SELECT COUNT(*) AS `count` FROM `sncache_cmdb_ci`');
row = curd.fetchone()
ci_count = row['count']
token_auth = False
if not cortex.lib.user.is_logged_in():
auth = request.authorization
if not auth:
if not allow_api_token:
return send_auth_required_response(allow_api_token)
if 'X-Auth-Token' not in request.headers:
return send_auth_required_response(allow_api_token)
if cortex.app.config['CORTEX_API_AUTH_TOKEN'] != request.headers['X-Auth-Token']:
raise UnauthorizedException
token_auth = True
else:
if not cortex.lib.user.authenticate(auth.username, auth.password):
raise UnauthorizedException
if not token_auth:
# Mark as logged on
session['username'] = auth.username.lower()
session['logged_in'] = True
# Log a successful login
cortex.lib.core.log(__name__, 'cortex.api.login', '' + session['username'] + ' logged in (on API) using ' + request.user_agent.string)
else:
session['api_token_valid'] = True
if not token_auth and require_permission is not None:
if not cortex.lib.user.does_user_have_permission('api.{0}'.format(require_permission)):
raise InvalidPermissionException
return func(*args, **kwargs)
curd.execute('SELECT COUNT(*) AS `count` FROM `tasks` WHERE `status` = %s AND `end` > DATE_SUB(NOW(), INTERVAL 8 HOUR)', (2,))
row = curd.fetchone()
task_failed_count = row['count']
# Get number of warning tasks in the last 8 hours
curd.execute('SELECT COUNT(*) AS `count` FROM `tasks` WHERE `status` = %s AND `end` > DATE_SUB(NOW(), INTERVAL 8 HOUR)', (3,))
row = curd.fetchone()
task_warning_count = row['count']
# Get tasks for user
curd.execute('SELECT `id`, `module`, `start`, `end`, `status`, `description` FROM `tasks` WHERE `username` = %s ORDER BY `start` DESC LIMIT 5', (session['username'],))
tasks = curd.fetchall()
# We don't need the data, but we need to make sure the LDAP cache is up
# to date for the systems query to work
cortex.lib.user.get_users_groups()
# Get the list of systems the user is specifically allowed to view
curd.execute("SELECT * FROM `systems_info_view` WHERE (`id` IN (SELECT `system_id` FROM `system_perms_view` WHERE (`type` = '0' AND `who` = %s AND (`perm` = 'view' OR `perm` = 'view.overview' OR `perm` = 'view.detail')) OR (`type` = '1' AND (`perm` = 'view' OR `perm` = 'view.overview' OR `perm` = 'view.detail') AND `who` IN (SELECT `group` FROM `ldap_group_cache` WHERE `username` = %s))) OR `allocation_who`=%s) AND ((`cmdb_id` IS NOT NULL AND `cmdb_operational_status` = 'In Service') OR `vmware_uuid` IS NOT NULL) ORDER BY `allocation_date` DESC LIMIT 100;",(session['username'],session['username'], session['username']))
systems = curd.fetchall()
# Recent systems
curd.execute("SELECT * FROM `systems_info_view` ORDER BY `allocation_date` DESC LIMIT 0,5")
recent_systems = curd.fetchall()
# OS VM stats
types = cortex.lib.vmware.get_os_stats()
# select SUM(`ram`) from vmware_cache_clusters;
# select SUM(`ram_usage`) from vmware_cache_clusters;
# select SUM(`memoryMB`) FROM `vmware_cache_vm`;
#CAS client init
cas_client = CASClient(app.config['CAS_SERVER_URL'], app.config['CAS_SERVICE_URL'], verify_certificates=True)
#SLO
if request.method == 'POST' and session.get('cas_ticket') is not None and 'logoutRequest' in request.form:
#check the verify the ticket to prevent cross orign attacks
message = cas_client.parse_logout_request(request.form.get('logoutRequest'))
if message.get('session_index', None) == session.get('cas_ticket'):
cortex.lib.user.clear_session()
return ('', 200)
else:
abort(400)
# If the user is already logged in, just redirect them to their dashboard
if cortex.lib.user.is_logged_in():
return redirect(url_for('dashboard'))
ticket = request.args.get('ticket', None)
if ticket is not None:
try:
cas_response = cas_client.perform_service_validate(ticket=ticket)
except:
return root()
if cas_response and cas_response.success:
try:
# keep the ticket for SLO
session['cas_ticket'] = ticket
return cortex.lib.user.logon_ok(cas_response.attributes.get('uid'))
except KeyError:
# required user attributes not returned
flash("CAS SSO authentication successful but missing required information consider using LDAP authentication", 'alert-warning')