Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _assert_account_auth(
self, api_url, account_auth_token, account_id, capability, bucket_id=None, file_name=None
):
key_sim = self.auth_token_to_key.get(account_auth_token)
assert key_sim is not None
assert api_url == self.API_URL
assert account_id == key_sim.account_id
if account_auth_token in self.expired_auth_tokens:
raise InvalidAuthToken('auth token expired', 'auth_token_expired')
if capability not in key_sim.capabilities:
raise Unauthorized('', 'unauthorized')
if key_sim.bucket_id_or_none is not None and key_sim.bucket_id_or_none != bucket_id:
raise Unauthorized('', 'unauthorized')
if key_sim.name_prefix_or_none is not None:
if file_name is not None and not file_name.startswith(key_sim.name_prefix_or_none):
raise Unauthorized('', 'unauthorized')
if set(capabilities) != set(ALL_CAPABILITIES):
key_messages.append("with capabilities '" + ','.join(capabilities) + "'")
if bucket_name is not None:
key_messages.append("restricted to bucket '" + bucket_name + "'")
if name_prefix is not None:
key_messages.append("restricted to files that start with '" + name_prefix + "'")
if not key_messages:
key_messages.append('with no restrictions')
# Make a new message
new_message = unauthorized.message
if new_message == '':
new_message = 'unauthorized'
new_message += ' for application key ' + ', '.join(key_messages)
return Unauthorized(new_message, unauthorized.code)
def _assert_account_auth(
self, api_url, account_auth_token, account_id, capability, bucket_id=None, file_name=None
):
key_sim = self.auth_token_to_key.get(account_auth_token)
assert key_sim is not None
assert api_url == self.API_URL
assert account_id == key_sim.account_id
if account_auth_token in self.expired_auth_tokens:
raise InvalidAuthToken('auth token expired', 'auth_token_expired')
if capability not in key_sim.capabilities:
raise Unauthorized('', 'unauthorized')
if key_sim.bucket_id_or_none is not None and key_sim.bucket_id_or_none != bucket_id:
raise Unauthorized('', 'unauthorized')
if key_sim.name_prefix_or_none is not None:
if file_name is not None and not file_name.startswith(key_sim.name_prefix_or_none):
raise Unauthorized('', 'unauthorized')
elif status == 400 and code == "missing_part":
return MissingPart(post_params.get('fileId'))
elif status == 400 and code == "part_sha1_mismatch":
return PartSha1Mismatch(post_params.get('fileId'))
elif status == 400 and code == "bad_request":
matcher = UPLOAD_TOKEN_USED_CONCURRENTLY_ERROR_MESSAGE_RE.match(message)
if matcher is not None:
token = matcher.group('token')
return UploadTokenUsedConcurrently(token)
return BadRequest(message, code)
elif status == 400:
return BadRequest(message, code)
elif status == 401 and code in ("bad_auth_token", "expired_auth_token"):
return InvalidAuthToken(message, code)
elif status == 401:
return Unauthorized(message, code)
elif status == 403 and code == "storage_cap_exceeded":
return StorageCapExceeded()
elif status == 409:
return Conflict()
elif status == 416 and code == "range_not_satisfiable":
return UnsatisfiableRange()
elif status == 429:
return TooManyRequests(retry_after_seconds=response_headers.get('retry-after'))
elif 500 <= status < 600:
return ServiceError('%d %s %s' % (status, code, message))
return UnknownError('%d %s %s' % (status, code, message))
def __init__(self, message, code):
super(Unauthorized, self).__init__()
self.message = message
self.code = code