Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
validate(), it is called an any exceptions it raises are wrapped
in an InvalidUserError
* verifies that custom claims may be encoded in the token and
validates that the custom claims do not collide with reserved
claims
"""
guard = Praetorian(app, user_class)
the_dude = user_class(
username='TheDude',
password=guard.hash_password('abides'),
roles='admin,operator',
)
moment = pendulum.parse('2017-05-21 18:39:55')
with freezegun.freeze_time(moment):
token = guard.encode_jwt_token(the_dude)
token_data = jwt.decode(
token, guard.encode_key, algorithms=guard.allowed_algorithms,
)
assert token_data['iat'] == moment.int_timestamp
assert token_data['exp'] == (
moment + DEFAULT_JWT_ACCESS_LIFESPAN
).int_timestamp
assert token_data[REFRESH_EXPIRATION_CLAIM] == (
moment + DEFAULT_JWT_REFRESH_LIFESPAN
).int_timestamp
assert token_data['id'] == the_dude.id
assert token_data['rls'] == 'admin,operator'
moment = pendulum.parse('2017-05-21 18:39:55')
override_access_lifespan = pendulum.Duration(minutes=1)
override_refresh_lifespan = pendulum.Duration(hours=1)
with freezegun.freeze_time(moment):
discovery_doc_url = 'https://accounts.google.com/.well-known/openid-configuration'
r = requests.get(discovery_doc_url)
token_endpoint = r.json()['token_endpoint']
data = {
'code': request.json['code'],
'client_id': request.json['clientId'],
'client_secret': current_app.config['OAUTH2_CLIENT_SECRET'],
'redirect_uri': request.json['redirectUri'],
'grant_type': 'authorization_code'
}
r = requests.post(token_endpoint, data)
token = r.json()
id_token = jwt.decode(
token['id_token'],
verify=False
)
subject = id_token['sub']
name = id_token['name']
email = id_token['email']
domain = email.split('@')[1]
email_verified = id_token['email_verified']
if not_authorized('ALLOWED_EMAIL_DOMAINS', groups=[domain]):
raise ApiError('User %s is not authorized' % email, 403)
customers = get_customers(email, groups=[domain])
auth_audit_trail.send(current_app._get_current_object(), event='google-login', message='user login via Google',
def parse(cls, token: str, key: str = None, verify: bool = True, algorithm: str = 'HS256') -> 'Jwt':
try:
json = jwt.decode(
token,
key=key or current_app.config['SECRET_KEY'],
verify=verify,
algorithms=algorithm,
audience=current_app.config['OAUTH2_CLIENT_ID'] or current_app.config['SAML2_ENTITY_ID'] or absolute_url()
)
except (DecodeError, ExpiredSignature, InvalidAudience):
raise
return Jwt(
iss=json.get('iss', None),
typ=json.get('typ', None),
sub=json.get('sub', None),
aud=json.get('aud', None),
exp=json.get('exp', None),
nbf=json.get('nbf', None),
def get_token_data(request):
token = get_token(request)
token = jwt.decode(token, settings.JWT_SECRET)
return token
kwargs = {}
for claim in self.claims:
if claim != "exp":
setting = "claim_{}".format(claim.lower())
if setting in self.config: # noqa
value = self.config.get(setting)
kwargs.update({claim_label[claim]: value})
kwargs["leeway"] = int(self.config.leeway())
if "claim_aud" in self.config: # noqa
kwargs["audience"] = self.config.claim_aud()
if "claim_iss" in self.config: # noqa
kwargs["issuer"] = self.config.claim_iss()
decoded = jwt.decode(
token,
secret,
algorithms=[algorithm],
verify=verify,
options={"verify_exp": self.config.verify_exp()},
**kwargs,
)
if verify:
if self._extra_verifications:
self._verify_extras(decoded)
if self._custom_claims or inline_claims:
self._verify_custom_claims(
decoded, inline_claims=inline_claims
)
def check_token(jwt_token) -> dict:
return jwt.decode(jwt_token, config.public_key)
def decode_jwt_token(token):
unverified_header = jwt.get_unverified_header(token)
unverified_claims = jwt.decode(token, verify=False)
if unverified_header.get(claims.KEY_ID):
unverified_key_id = str(unverified_header.get(claims.KEY_ID))
else:
unverified_key_id = None
if claims.ISSUER not in unverified_claims:
raise MissingRequiredClaimError(claims.ISSUER)
unverified_issuer = str(unverified_claims[claims.ISSUER])
if api_settings.ACCEPTED_ISSUERS is not None and unverified_issuer not in api_settings.ACCEPTED_ISSUERS:
raise InvalidIssuerError("Invalid issuer")
public_key, key_id = get_public_key_and_key_id(issuer=unverified_issuer, key_id=unverified_key_id)
async def dispatch(self, request, handler):
request.state.jwt_payload = {}
jwt_token = (
request.headers.get("Authorization", "").replace("Bearer", "").strip()
)
if not jwt_token:
return await handler(request)
try:
payload = jwt.decode(
jwt_token, verify=False, secret=None, algorithms=[JWT_ALGORITHMS]
)
except (jwt.InvalidTokenError) as exc:
logger.exception(exc, exc_info=exc)
msg = "Invalid authorization token, " + str(exc)
return Response(status_code=403, content=msg)
except Exception as exc:
logger.error("Cannot deocde authorization token, " + str(exc))
else:
request.state.jwt_payload = payload
return await handler(request)
def API_verification(auth_token):
try:
Decoded_Token = jwt.decode(auth_token, API_Secret, algorithm='HS256')
User_ID = int(Decoded_Token['id'])
Cursor.execute('SELECT * FROM users WHERE user_id = %s', (User_ID,))
User_Details = Cursor.fetchone()
if auth_token == User_Details[5]:
return {"Token": True, "Admin": User_Details[4], "Message": "Token verification successful."}
else:
return {"Token": False, "Admin": False, "Message": "Invalid token."}
except jwt.ExpiredSignatureError:
return {"Token": False, "Admin": False, "Message": "Token expired."}
except jwt.DecodeError:
return {"Token": False, "Admin": False, "Message": "Failed to decode token."}
def parse_token(token):
return jwt.decode(token, key=app.config['SECRET_KEY'], audience=app.config['OAUTH2_CLIENT_ID'] or request.url_root)