Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def decode_auth_token(auth_token):
"""
Decodes the auth token - :param auth_token: - :return: integer|string
"""
try:
payload = jwt.decode(
auth_token, current_app.config.get('SECRET_KEY'))
return payload['sub']
except jwt.ExpiredSignatureError:
return 'Signature expired. Please log in again.'
except jwt.InvalidTokenError:
return 'Invalid token. Please log in again.'
验证Token
:param auth_token:
:return: integer|string
"""
try:
payload = jwt.decode(
auth_token,
current_app.secret_key,
leeway=get_config(
"rest_auth_token",
"LOGIN_LIFETIME"))
# payload = jwt.decode(auth_token, get_config("key", "SECRET_KEY"), options={'verify_exp': True})
if 'data' in payload and 'id' in payload['data']:
return payload
else:
raise jwt.InvalidTokenError
except jwt.ExpiredSignatureError:
return gettext('The provided OSR-BearerToken has expired')
except jwt.InvalidTokenError:
return gettext('Invalid OSR-BearerToken')
def decode_auth_token(auth_token):
"""
Decodes the auth token
:param auth_token:
:return: dict|string
"""
try:
payload = jwt.decode(auth_token, key)
is_blacklisted_token = InvalidToken.check_blacklist(auth_token)
if is_blacklisted_token:
return 'Token blacklisted. Please log in again.'
else:
return {'username': payload['sub']}
except jwt.ExpiredSignatureError:
return 'Signature expired. Please log in again.'
except jwt.InvalidTokenError:
return 'Invalid token. Please log in again.'
except Exception:
return 'Unknown exception.'
def decode_auth_token(auth_token):
"""
Validates the auth token
:param auth_token:
:return: integer|string
"""
try:
payload = jwt.decode(auth_token, app.config.get('SECRET_KEY'))
is_blacklisted_token = BlacklistToken.check_blacklist(auth_token)
if is_blacklisted_token:
return 'Token blacklisted. Please log in again.'
else:
return payload['sub']
except jwt.ExpiredSignatureError:
return 'Signature expired. Please log in again.'
except jwt.InvalidTokenError:
return 'Invalid token. Please log in again.'
try:
# Log all exceptions
log.info('JWTKeyAuthentication failed; '
'it raised %s (%s)', exc.__class__.__name__, exc)
# Re-raise to deal with them properly.
raise exc
except TypeError:
msg = ugettext('Wrong type for one or more keys in payload')
raise exceptions.AuthenticationFailed(msg)
except jwt.ExpiredSignature:
msg = ugettext('Signature has expired.')
raise exceptions.AuthenticationFailed(msg)
except jwt.DecodeError:
msg = ugettext('Error decoding signature.')
raise exceptions.AuthenticationFailed(msg)
except jwt.InvalidTokenError:
msg = ugettext('Invalid JWT Token.')
raise exceptions.AuthenticationFailed(msg)
# Note: AuthenticationFailed can also be raised directly from our
# jwt_decode_handler.
user = self.authenticate_credentials(payload)
# Send user_logged_in signal when JWT is used to authenticate an user.
# Otherwise, we'd never update the last_login information for users
# who never visit the site but do use the API to upload new add-ons.
user_logged_in.send(sender=self.__class__, request=request, user=user)
return (user, jwt_value)
def should_update_token(self, token, token_fresh_interval):
try:
# Auth token
token_data = jwt.decode(token, verify=False)
# dt will be the same as Date.now() in Javascript but converted to
# milliseconds for consistency with js/sc sdk
dt = (datetime.utcnow() - datetime.utcfromtimestamp(0)).total_seconds()
dt = dt * 1000
age = dt - (int(token_data['iat']) * 1000)
return int(age) >= int(token_fresh_interval)
except jwt.InvalidTokenError:
# User API token
return False
jwt_value = self.get_jwt_value(request)
if jwt_value is None:
return None
try:
payload = api_settings.JWT_DECODE_HANDLER(jwt_value)
except User.DoesNotExist:
msg = _('User not found.')
raise exceptions.AuthenticationFailed(msg)
except jwt.ExpiredSignature:
msg = _('Token has expired.')
raise exceptions.AuthenticationFailed(msg)
except jwt.DecodeError:
msg = _('Invalid token.')
raise exceptions.AuthenticationFailed(msg)
except jwt.InvalidTokenError:
raise exceptions.AuthenticationFailed()
user = self.authenticate_credentials(payload)
return (user, jwt_value)
reason='Missing authorization token',
)
if token is not None:
if not isinstance(token, bytes):
token = token.encode()
try:
decoded = jwt.decode(
token,
secret_or_pub_key,
algorithms=algorithms,
audience=audience,
issuer=issuer
)
except jwt.InvalidTokenError as exc:
logger.exception(exc, exc_info=exc)
msg = 'Invalid authorization token, ' + str(exc)
raise web.HTTPUnauthorized(reason=msg)
if callable(is_revoked):
if await invoke(partial(
is_revoked,
request,
decoded,
)):
raise web.HTTPForbidden(reason='Token is revoked')
request[request_property] = decoded
if store_token and isinstance(store_token, str):
request[store_token] = token
def get_payload(token, context=None):
try:
payload = jwt_settings.JWT_DECODE_HANDLER(token, context)
except jwt.ExpiredSignature:
raise exceptions.JSONWebTokenExpired()
except jwt.DecodeError:
raise exceptions.JSONWebTokenError(_('Error decoding signature'))
except jwt.InvalidTokenError:
raise exceptions.JSONWebTokenError(_('Invalid token'))
return payload