Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_building_id_site_redirect_uri(self):
try:
from urlparse import urlparse
except ImportError:
from urllib.parse import urlparse
ret = self.app.build_id_site_redirect_url('http://localhost/')
try:
jwt_response = urlparse(ret).query.split('=')[1]
except:
self.fail("Failed to parse ID site redirect uri")
try:
decoded_data = jwt.decode(
jwt_response, verify=False, algorithms=['HS256'])
except jwt.DecodeError:
self.fail("Invaid JWT generated.")
self.assertIsNotNone(decoded_data.get('iat'))
self.assertIsNotNone(decoded_data.get('jti'))
self.assertIsNotNone(decoded_data.get('iss'))
self.assertEqual(decoded_data.get('iss'), self.app._client.auth.id)
self.assertIsNotNone(decoded_data.get('sub'))
self.assertIsNotNone(decoded_data.get('cb_uri'))
self.assertEqual(decoded_data.get('cb_uri'), 'http://localhost/')
self.assertIsNone(decoded_data.get('path'))
self.assertIsNone(decoded_data.get('state'))
ret = self.app.build_id_site_redirect_url(
'http://testserver/',
path='/#/register',
def test_unauthenticated_userid_return_none_if_not_jwt_token(
self, inst, request_, mocker):
from jwt import DecodeError
from . import UserTokenHeader
mocker.patch('jwt.decode', side_effect=DecodeError)
request_.headers[UserTokenHeader] = 'tokenhash'
assert inst.unauthenticated_userid(request_) is None
def fetch_token_header(token):
"""
Fetch the header out of the JWT token.
:param token:
:return: :raise jwt.DecodeError:
"""
token = token.encode("utf-8")
try:
signing_input, crypto_segment = token.rsplit(b".", 1)
header_segment, payload_segment = signing_input.split(b".", 1)
except ValueError:
raise jwt.DecodeError("Not enough segments")
try:
return json.loads(jwt.utils.base64url_decode(header_segment).decode("utf-8"))
except TypeError as e:
current_app.logger.exception(e)
raise jwt.DecodeError("Invalid header padding")
Fetch the header and payload out of the JWT token.
:param token:
:return: :raise jwt.DecodeError:
"""
token = token.encode('utf-8')
try:
signing_input, crypto_segment = token.rsplit(b'.', 1)
header_segment, payload_segment = signing_input.split(b'.', 1)
except ValueError:
raise jwt.DecodeError('Not enough segments')
try:
header = json.loads(jwt.utils.base64url_decode(header_segment).decode('utf-8'))
except TypeError as e:
current_app.logger.exception(e)
raise jwt.DecodeError('Invalid header padding')
try:
payload = json.loads(jwt.utils.base64url_decode(payload_segment).decode('utf-8'))
except TypeError as e:
current_app.logger.exception(e)
raise jwt.DecodeError('Invalid payload padding')
return (header, payload)
def jwt_decode(token):
try:
decoded = jwt.decode(token, hug.current_app_config.SECRET_KEY)
except jwt.DecodeError:
raise HTTPError(status=HTTP_401, title='Authentication Error',
description='Invalid token')
except jwt.ExpiredSignatureError:
raise HTTPError(status=HTTP_401, title='Authentication Error',
description='Token expired')
else:
return decoded
if payload is None:
return None
payload = jwt.decode(json.loads(payload), config.settings.main.secret)
user = None
for u in config.settings.auth.users:
if u.username == payload["username"]:
session["username"] = u["username"]
user = u
break
if user is not None:
return user
else:
logger.warn("Token is invalid, user is unknown")
return None
except (ValueError, DecodeError) as e:
# logger.warn('Token is invalid') #TODO Rewrite handling for cookies instead of tokens
return None
except ExpiredSignature:
logger.warn("Token has expired")
return None
raise DecodeError('Invalid header string: must be a json object')
try:
payload_data = base64url_decode(payload_segment)
except (TypeError, binascii.Error):
raise DecodeError('Invalid payload padding')
try:
payload = json.loads(payload_data.decode('utf-8'))
except ValueError as e:
raise DecodeError('Invalid payload string: %s' % e)
try:
signature = base64url_decode(crypto_segment)
except (TypeError, binascii.Error):
raise DecodeError('Invalid crypto padding')
return (header, payload, signature, signing_input)
def verify_jwt(token: str) -> typing.Optional[typing.Mapping]:
try:
unverified_token = jwt.decode(token, verify=False)
except jwt.DecodeError:
logger.info(f"Failed to decode JWT: {token}", exc_info=True)
raise DSSException(401, 'Unauthorized', 'Failed to decode token.')
assert_authorized_issuer(unverified_token)
issuer = unverified_token['iss']
public_keys = get_public_keys(issuer)
try:
token_header = jwt.get_unverified_header(token)
verified_tok = jwt.decode(token,
key=public_keys[token_header["kid"]],
issuer=issuer,
audience=Config.get_audience(),
algorithms=allowed_algorithms,
)
logger.info("""{"valid": true, "token": %s}""", json.dumps(verified_tok))
try:
payload = jwt_auth.jwt_decode_handler(jwt_value)
except Exception as exc:
try:
# Log all exceptions
log.info('JWTKeyAuthentication failed; '
'it raised %s (%s)', exc.__class__.__name__, exc)
# Re-raise to deal with them properly.
raise exc
except TypeError:
msg = ugettext('Wrong type for one or more keys in payload')
raise exceptions.AuthenticationFailed(msg)
except jwt.ExpiredSignature:
msg = ugettext('Signature has expired.')
raise exceptions.AuthenticationFailed(msg)
except jwt.DecodeError:
msg = ugettext('Error decoding signature.')
raise exceptions.AuthenticationFailed(msg)
except jwt.InvalidTokenError:
msg = ugettext('Invalid JWT Token.')
raise exceptions.AuthenticationFailed(msg)
# Note: AuthenticationFailed can also be raised directly from our
# jwt_decode_handler.
user = self.authenticate_credentials(payload)
# Send user_logged_in signal when JWT is used to authenticate an user.
# Otherwise, we'd never update the last_login information for users
# who never visit the site but do use the API to upload new add-ons.
user_logged_in.send(sender=self.__class__, request=request, user=user)
return (user, jwt_value)
def decorated_function(*args, **kwargs):
if not request.headers.get('Authorization'):
response = jsonify(message='Missing authorization header')
response.status_code = 401
return response
try:
payload = parse_token(request)
except DecodeError:
response = jsonify(message='Token is invalid')
response.status_code = 401
return response
except ExpiredSignature:
response = jsonify(message='Token has expired')
response.status_code = 401
return response
g.user_id = payload['sub']
return f(*args, **kwargs)