Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def searchField(self, value):
if value is None:
raise exc.HTTPBadRequest('Please provide a searchField')
if len(value.split(',')) > 1:
raise exc.HTTPBadRequest('You can provide only one searchField at a time')
self._searchField = value
def ma_offset(self, value):
if value is None:
self._ma_offset = 3
else:
if value.isdigit():
self._ma_offset = int(value)
else:
raise HTTPBadRequest("Please provide a numerical value for the parameter 'offset'")
def put_global_roles_for_user(request):
user_id = request.matchdict['user_id']
user = User.get_instance(user_id)
session = User.default_db
if not user:
raise HTTPNotFound("User id %d does not exist" % (user_id,))
try:
data = json.loads(request.body)
except Exception as e:
raise HTTPBadRequest("Malformed Json")
if not isinstance(data, list):
raise HTTPBadRequest("Not a list")
if data and frozenset((type(x) for x in data)) != frozenset((str,)):
raise HTTPBadRequest("not strings")
roles = set(session.query(Role).filter(Role.name.in_(data)).all())
data = set(data)
if len(roles) != len(data):
raise HTTPBadRequest("Not valid roles: %s" % (repr(
data - set((p.name for p in roles))),))
known_gu_roles = session.query(UserRole).join(Role).filter(
user=user).all()
gur_by_role = {gur.role.name: gur for gur in known_gu_roles}
known_roles = set(gur_by_role.keys())
for role in known_roles - roles:
session.query.delete(gur_by_role[role])
for role in roles - known_roles:
session.add(UserRole(user=user, role=role))
return {"added": list(roles - known_roles),
token = request.validated['body']['id_token']
# Get the data from the config because the request might only
# have local network information and not the public facing ones.
audience = '{scheme}://{host}'.format(scheme=request.registry.settings['http_scheme'],
host=request.registry.settings['http_host'])
try:
email, stored_redirect = get_verified_email(
broker_url=broker_uri,
token=token,
audience=audience,
issuer=broker_uri,
cache=request.registry.cache)
except ValueError as exc:
error_details = 'Portier token validation failed: %s' % exc
return http_error(httpexceptions.HTTPBadRequest(),
errno=ERRORS.INVALID_AUTH_TOKEN, error='Invalid Auth Token',
message=error_details)
# Generate a random token
user_token = codecs.encode(os.urandom(32), 'hex').decode('utf-8')
# Encrypt the email with the token
encrypted_email = encrypt(email, user_token)
# Generate a user ID from the token
hmac_secret = request.registry.settings['userid_hmac_secret']
userID = utils.hmac_digest(hmac_secret, user_token)
# Store the encrypted user ID with the token
session_ttl = portier_conf(request, 'session_ttl_seconds')
request.registry.cache.set('portier:{}'.format(userID), encrypted_email, session_ttl)
def _get_feature_service(request):
params = FeatureParams(request)
models = models_from_name(params.layerId)
if models is None:
raise exc.HTTPBadRequest('No Vector Table was found for %s' % params.layerId)
feature, template, hasExtendedInfo = _get_feature(
params,
models,
params.layerId,
params.featureId
)
return feature
def lat(self, value):
if value is None:
raise exc.HTTPBadRequest("Missing parameter 'norhting'/'lat'")
try:
self._lat = float(value)
except ValueError:
raise exc.HTTPBadRequest("Please provide numerical values for the parameter 'northing'/'lat'")
name = error["name"]
description = error["description"]
if isinstance(description, bytes):
description = error["description"].decode("utf-8")
if name is not None:
if str(name) in description:
message = description
else:
message = "{name} in {location}: {description}".format_map(error)
else:
message = "{location}: {description}".format_map(error)
response = http_error(
httpexceptions.HTTPBadRequest(),
code=errors.status,
errno=ERRORS.INVALID_PARAMETERS.value,
error="Invalid parameters",
message=message,
details=errors,
)
response.status = errors.status
response = reapply_cors(request, response)
return response
def _get_file_id_from_admin_id(admin_id):
fileId = None
table = get_dynamodb_table(table_name='geoadmin-file-storage')
try:
item = table.get_item(adminId=str(admin_id))
fileId = item.get('fileId')
except Exception as e:
raise exc.HTTPBadRequest('The id %s doesn\'t exist. Error is: %s' % (admin_id, e))
if fileId is None:
return False
return fileId
def _get_scheme(self, name):
"""
Create a scheme instance due to its name
:rtype : schemes.BaseScheme
"""
if name:
name = name.lower()
try:
return self._scheme_instances[name]
except KeyError:
try:
cls = self._scheme_classes[name]
except KeyError:
logging.warning('HTTP authentication scheme "%s" is '
'not supported', name)
raise HTTPBadRequest('HTTP authentication scheme "%s" is '
'not supported' % name)
scheme = cls(self, **self._kwargs)
self._scheme_instances[name] = scheme
return scheme
def put_roles(request):
session = Role.default_db
try:
data = json.loads(request.body)
except Exception as e:
raise HTTPBadRequest("Malformed Json")
if not isinstance(data, list):
raise HTTPBadRequest("Not a list")
if data and frozenset((type(x) for x in data)) != frozenset((str,)):
raise HTTPBadRequest("not strings")
data = set(data)
known_roles = session.query(Role).all()
roles_by_name = {r.name: r for r in known_roles}
role_names = set(roles_by_name.keys())
# new roles
for name in data - role_names:
session.add(Role(name=name))
# delete non-system roles.
for name in role_names - data - SYSTEM_ROLES:
session.delete(roles_by_name[name])
return {"added": list(data - role_names),
"removed": list(role_names - data - SYSTEM_ROLES)}