Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __del__(self, *args, **kwargs):
try:
uploadKey = None
if cherrypy.session.has_key("user"):
uploadKey = cherrypy.session.get('user').id
elif cherrypy.session.has_key("uploadRequest"):
uploadKey = cherrypy.session.get("uploadRequest").owner_id+":"+cherrypy.session.get("uploadRequest").id
if cherrypy.file_uploads.has_key(uploadKey):
for transfer in cherrypy.file_uploads[uploadKey]:
if transfer.file_object.name == self.file_location:
cherrypy.file_uploads[uploadKey].remove(transfer)
if len(cherrypy.file_uploads[uploadKey]) == 0:
del cherrypy.file_uploads[uploadKey]
if os.path.isfile(self.file_location):
tempFileName = self.file_location.split(os.path.sep)[-1]
FileService.queue_for_deletion(tempFileName)
except KeyError:
pass
except AttributeError, ae:
pass
except OSError, oe:
def make_file(self, binary=None):
if self.filename is not None:
uploadIndex = None
if cherrypy.request.headers.has_key("uploadindex"):
uploadIndex = cherrypy.request.headers['uploadindex']
fo = ProgressFile(self.bufsize, self.filename, uploadIndex=uploadIndex)
self.file_location = fo.file_object.name
uploadKey = None
if cherrypy.session.has_key("uploadTicket"):
uploadKey = cherrypy.session.get("uploadTicket").ownerId+":"+cherrypy.session.get("uploadTicket").ticketId
elif cherrypy.session.has_key("user"):
uploadKey = cherrypy.session.get('user').userId
if cherrypy.file_uploads.has_key(uploadKey):
cherrypy.file_uploads[uploadKey].append(fo)
else:
cherrypy.file_uploads[uploadKey] = [fo,]
return fo
else:
return StringIO.StringIO("")
cherrypy._cpcgifs.FieldStorage = FileFieldStorage
def get_session(self):
oidsession = cherrypy.session.get(self.session_name, None)
if not oidsession or not isinstance(oidsession, dict):
oidsession = {}
if 'sid' not in oidsession:
sid = randomString(16, '0123456789abcdef')
oidsession['sid'] = sid
cherrypy.session[self.session_name] = oidsession
cherrypy.session[self.session_name]['status'] = UNKNOWN
return cherrypy.session[self.session_name]
def session(self):
"""
Uses cherrypy sessions instead of implementing by my own because:
- The sessionid is bounded to the user-agent and then less subject
to sessionid hijacking (when the cookie is theft or the sessionid
is guessed)
- It has a protection against session fixation attacks
(see http://en.wikipedia.org/wiki/Session_fixation)
- It allows me to choose the backend to store session information
Another more secure solution to consider would be to use the SSL/TLS
session identifier. But it would require changing the frontend config
to set the SSL_SESSION_ID variable into the request sent to the backend
"""
oidsession = cherrypy.session.get(self.sessname, None)
if not oidsession:
cherrypy.session[self.sessname] = {}
cherrypy.session[self.sessname]['status'] = UNKNOWN # auth state of this session
cherrypy.session[self.sessname]['user_url'] = None
# The user related to this session
# user_url = self.oidserver+'id/'+the real
# username seen by the oid server
# (will come from Hypernews/SiteDB)
cherrypy.session[self.sessname]['debug_info'] = None
cherrypy.session[self.sessname]['fullname'] = None
cherrypy.session[self.sessname]['dn'] = None
cherrypy.session[self.sessname]['permissions'] = None # user roles
return cherrypy.session[self.sessname]
@staticmethod
def check_passwd(realm, user, passwd):
"""
This function is called before ALL XML-RPC calls,
to check the username and password.
A user CANNOT use Twister if he doesn't authenticate.
"""
user_passwd = binascii.hexlify(user + ':' + passwd)
if (not user) or (not passwd):
return False
with usr_pwds_lock:
sess_user = cherrypy.session.get('username')
if cherrypy.session.get('user_passwd') == user_passwd:
return True
elif user in usrs_and_pwds and usrs_and_pwds.get(user) == passwd:
if not sess_user or sess_user != user:
cherrypy.session['username'] = user
cherrypy.session['user_passwd'] = user_passwd
return True
elif passwd == 'EP':
if not sess_user or sess_user != user:
cherrypy.session['username'] = user
return True
t = paramiko.Transport(('localhost', 22))
t.logger.setLevel(40) # Less spam, please
t.start_client()
# This operation is pretty heavy!!!
@cherrypy.expose
def logView(self):
#Look for temlates in this directory
tmpl = self.lookup.get_template("logview.html")
return tmpl.render(username = cherrypy.session.get('_cp_username'), webroot=cherrypy.request.script_name)
@cherrypy.expose
@cherrypy.tools.requires_login(permission="admin")
def get_role_permissions(self, roleId, format="json", **kwargs):
user, sMessages, fMessages, permissionData = (cherrypy.session.get("user"),[], [], [])
try:
roleId = strip_tags(roleId)
role = session.query(Role).filter(Role.id == roleId).one()
permissions = session.query(Permission).all()
for permission in permissions:
if permission in role.permissions:
permissionData.append({'permissionId': permission.id, 'permissionName': permission.name, 'inheritedFrom': "role"})
else:
permissionData.append({'permissionId': permission.id, 'permissionName': permission.name, 'inheritedFrom': ""})
except sqlalchemy.orm.exc.NoResultFound:
fMessages.append("The role ID: %s does not exist" % str(roleId))
except Exception, e:
cherrypy.log.error("[%s] [get_role_permissions] [Couldn't get permissions for role %s: %s]" % (user.id, roleId, str(e)))
fMessages.append("Could not get permissions: %s" % str(e))
return fl_response(sMessages, fMessages, format, data=permissionData)
@cherrypy.expose
def service_mngr_command(self, command, name='', *args, **kwargs):
"""
Send commands to Service Manager.\n
Valid commands are: list, start, stop, status, get config, save config, get log.
"""
logFull('CeXmlRpc:service_mngr_command')
# Check the username from CherryPy connection
user = cherrypy.session.get('username')
user_roles = self.project.authenticate(user)
if not user_roles:
return False
if 'CHANGE_SERVICES' not in user_roles['roles']:
logDebug('Privileges ERROR! Username `{user}` cannot use Service Manager!'.format(**user_roles))
return False
return self.project.manager.send_command(command, name, args, kwargs)
def hasCapability(capabilities, user=None, session_key=None):
"""
Determine if the user has the given capabilities.
"""
# Assign defaults if the user or session key is None
if user is None:
user = cherrypy.session['user']['name']
if session_key is None:
session_key = cherrypy.session.get('sessionKey')
# Convert the capability to a list if it was a scalar
if not isinstance(capabilities, list) or isinstance(capabilities, basestring):
capabilities = [capabilities]
# Get the capabilities that the user has
try:
users_capabilities = WebInputController.getCapabilities4User(user, session_key)
except splunk.LicenseRestriction:
# This can happen when the Splunk install is using the free license
# Check to see if the Splunk install is using the free license and allow access if so
# We are only going to check for this if it is the admin user since that is the user
# that the non-authenticated user is logged in as when the free license is used.
if user == 'admin':
def isAuthorized(self):
try:
sessionUsername = cherrypy.session.get('username', None)
sessionUserId = cherrypy.session.get('userid', -1)
nameById = self.userdb.getNameById(sessionUserId)
except (UnicodeDecodeError, ValueError) as e:
# workaround for python2/python3 jump, filed bug in cherrypy
# https://bitbucket.org/cherrypy/cherrypy/issue/1216/sessions-python2-3-compability-unsupported
log.w('''
Dropping all sessions! Try not to change between python 2 and 3,
everybody has to relogin now.''')
cherrypy.session.delete()
sessionUsername = None
if not sessionUsername:
return self.autoLoginIfPossible()
elif sessionUsername != nameById:
self.api_logout(value=None)
return False
return True