Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
context = gssapi.sec_contexts.SecurityContext(creds=server_creds)
token = ''.join(req.auth.split()[1:])
try:
context.step(b64decode(token))
except binascii.Error: # base64 errors
raise falcon.HTTPBadRequest("Bad request", "Malformed token")
except gssapi.raw.exceptions.BadMechanismError:
raise falcon.HTTPBadRequest("Bad request", "Unsupported authentication mechanism (NTLM?) was offered. Please make sure you've logged into the computer with domain user account. The web interface should not prompt for username or password.")
try:
username, realm = str(context.initiator_name).split("@")
except AttributeError: # TODO: Better exception
raise falcon.HTTPForbidden("Failed to determine username, are you trying to log in with correct domain account?")
if realm != config.KERBEROS_REALM:
raise falcon.HTTPForbidden("Forbidden",
"Cross-realm trust not supported")
if username.endswith("$") and optional:
# Extract machine hostname
# TODO: Assert LDAP group membership
req.context["machine"] = username[:-1].lower()
req.context["user"] = None
else:
# Attempt to look up real user
req.context["user"] = User.objects.get(username)
logger.debug("Succesfully authenticated user %s for %s from %s",
req.context["user"], req.env["PATH_INFO"], req.context["remote_addr"])
def verify_auth(req, schedule_id, connection, cursor):
team_query = ('SELECT `team`.`name` FROM `schedule` JOIN `team` '
'ON `schedule`.`team_id` = `team`.`id` WHERE `schedule`.`id` = %s')
cursor.execute(team_query, schedule_id)
if cursor.rowcount == 0:
cursor.close()
connection.close()
raise HTTPNotFound()
try:
check_team_auth(cursor.fetchone()[0], req)
except HTTPForbidden:
cursor.close()
connection.close()
raise
def project_acl_verify(username, project_obj):
if username in project_obj.get_project_acl():
return True
raise HTTPForbidden('Unauthorized',
'Provided Token credentials does not have sufficient permissions to access resource')
LOG.debug(ex)
description = _(u'A claim was specified, but the message '
u'is not currently claimed.')
raise falcon.HTTPBadRequest(error_title, description)
except storage_errors.ClaimDoesNotExist as ex:
LOG.debug(ex)
description = _(u'The specified claim does not exist or '
u'has expired.')
raise falcon.HTTPBadRequest(error_title, description)
except storage_errors.NotPermitted as ex:
LOG.debug(ex)
description = _(u'This message is claimed; it cannot be '
u'deleted without a valid claim ID.')
raise falcon.HTTPForbidden(error_title, description)
except Exception:
description = _(u'Message could not be deleted.')
LOG.exception(description)
raise wsgi_errors.HTTPServiceUnavailable(description)
# Alles guete
resp.status = falcon.HTTP_204
if 'plan' not in incident_params:
session.close()
raise HTTPBadRequest('missing plan name attribute', '')
plan_id = session.execute('SELECT `plan_id` FROM `plan_active` WHERE `name` = :plan',
{'plan': incident_params['plan']}).scalar()
if not plan_id:
logger.warn('Plan "%s" not found.', incident_params['plan'])
session.close()
raise HTTPNotFound()
app = req.context['app']
if 'application' in incident_params:
if not req.context['app']['allow_other_app_incidents']:
raise HTTPForbidden('This application %s does not allow creating incidents as other applications' % req.context['app']['name'], '')
app = cache.applications.get(incident_params['application'])
if not app:
raise HTTPBadRequest('Invalid application', '')
try:
context = incident_params['context']
context_json_str = ujson.dumps({variable: context.get(variable)
for variable in app['variables']})
if len(context_json_str) > 65535:
raise HTTPBadRequest('Context too long', '')
app_template_count = session.execute('''
SELECT EXISTS (
SELECT 1 FROM
def default_exception_handler(ex, req, resp, params):
if hasattr(ex, 'title') and "Failed data validation" in ex.title:
JsonSchemaException(ex)
message = "Unexpected error occurred: {}".format(ex)
logger.error(message + "\nRequest: {} Params: {}".format(req, params))
if isinstance(ex, falcon.HTTPUnauthorized):
raise ex
if isinstance(ex, falcon.HTTPForbidden):
raise ex
stacktrace = traceback.format_exc()
logger.error(stacktrace)
raise falcon.HTTPInternalServerError(message)
def wrapped(instance, req, resp, **kwargs):
authenticated_user = kwargs.get("authenticated_user", None)
requested_domain = kwargs.get("domain", None)
if not authenticated_user:
raise falcon.HTTPForbidden("Error", "User not authenticated")
if not requested_domain:
raise falcon.HTTPForbidden("Error", "No requested domain specified")
managed_domain = ADMINS.get(authenticated_user.username, None)
if not managed_domain:
raise falcon.HTTPForbidden("Error", "Not admin at all")
if requested_domain == managed_domain:
# I am admin of this domain
pass
elif requested_domain.endswith("." + managed_domain):
# I am admin of the superdomain
pass
else:
raise falcon.HTTPForbidden("Error", "Not domain admin")
:param req: Request instance that will be passed through.
:type req: falcon.Request
:param resp: Response instance that will be passed through.
:type resp: falcon.Response
:raises: falcon.HTTPForbidden
"""
cert = req.env.get(SSL_CLIENT_VERIFY, {})
if cert:
for obj in cert.get('subject', ()):
for key, value in obj:
if key == 'commonName' and \
(not self.cn or value == self.cn):
return
# Forbid by default
raise falcon.HTTPForbidden('Forbidden', 'Forbidden')
def on_put(self, req, resp, app_name):
if not req.context['username']:
raise HTTPUnauthorized('You must be a logged in user to change this application\'s email incident settings', '')
session = db.Session()
if not req.context['is_admin']:
if not session.execute('''SELECT 1
FROM `application_owner`
JOIN `target` on `target`.`id` = `application_owner`.`user_id`
JOIN `application` on `application`.`id` = `application_owner`.`application_id`
WHERE `target`.`name` = :username
AND `application`.`name` = :app_name''', {'app_name': app_name, 'username': req.context['username']}).scalar():
session.close()
raise HTTPForbidden('You don\'t have permissions to change this application\'s email incident settings.', '')
try:
email_to_plans = ujson.loads(req.context['body'])
except ValueError:
raise HTTPBadRequest('Invalid json in post body')
if email_to_plans:
email_addresses = tuple(email_to_plans.keys())
# If we're trying to configure email addresses which are members contacts, block this
check_users_emails = session.execute('''SELECT `target_contact`.`destination`
FROM `target_contact`
WHERE `target_contact`.`destination` IN :email_addresses
''', {'email_addresses': email_addresses}).fetchall()
if check_users_emails:
session.close()
def form_response(self, message):
if self.k8s_api_exception.status == 403:
raise falcon.HTTPForbidden(message)
elif 400 <= self.k8s_api_exception.status < 500:
raise falcon.HTTPBadRequest(message)
else:
raise falcon.HTTPInternalServerError(message)