Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def make_tok(self, key, alg, name):
pri_key = JWK(**key)
protected = {"typ": "JOSE+JSON",
"kid": key['kid'],
"alg": alg}
plaintext = {"sub": name,
"exp": int(time.time()) + (5 * 60)}
jws = JWS(payload=json_encode(plaintext))
jws.add_signature(pri_key, None, json_encode(protected))
return jws.serialize()
if jti_size:
claims['jti'] = base64url_encode(urandom(jti_size))
claims['nbf'] = timegm((not_before or now).utctimetuple())
claims['iat'] = timegm(now.utctimetuple())
if lifetime:
claims['exp'] = timegm((now + lifetime).utctimetuple())
elif expires:
claims['exp'] = timegm(expires.utctimetuple())
if header['alg'] == 'none':
signature = ''
else:
token = JWS(json_encode(claims))
token.add_signature(priv_key, protected=header)
signature = json_decode(token.serialize())['signature']
return u'%s.%s.%s' % (
base64url_encode(json_encode(header)),
base64url_encode(json_encode(claims)),
signature
)
try:
jtok = JWT(jwt=msg)
except Exception as e:
raise InvalidMessage('Failed to parse message: %s' % str(e))
try:
token = jtok.token
if isinstance(token, JWE):
token.decrypt(self.kkstore.server_keys[KEY_USAGE_ENC])
# If an encrypted payload is received then there must be
# a nested signed payload to verify the provenance.
payload = token.payload.decode('utf-8')
token = JWS()
token.deserialize(payload)
elif isinstance(token, JWS):
pass
else:
raise TypeError("Invalid Token type: %s" % type(jtok))
# Retrieve client keys for later use
self.client_keys = [
JWK(**self._get_key(token.jose_header, KEY_USAGE_SIG)),
JWK(**self._get_key(token.jose_header, KEY_USAGE_ENC))]
# verify token and get payload
token.verify(self.client_keys[KEY_USAGE_SIG])
claims = json_decode(token.payload)
except Exception as e:
logger.debug('Failed to validate message', exc_info=True)
raise InvalidMessage('Failed to validate message: %s' % str(e))
:returns: A verified payload
"""
try:
jtok = JWT(jwt=msg)
except Exception as e:
raise InvalidMessage('Failed to parse message: %s' % str(e))
try:
token = jtok.token
if isinstance(token, JWE):
token.decrypt(self.kkstore.server_keys[KEY_USAGE_ENC])
# If an encrypted payload is received then there must be
# a nested signed payload to verify the provenance.
payload = token.payload.decode('utf-8')
token = JWS()
token.deserialize(payload)
elif isinstance(token, JWS):
pass
else:
raise TypeError("Invalid Token type: %s" % type(jtok))
# Retrieve client keys for later use
self.client_keys = [
JWK(**self._get_key(token.jose_header, KEY_USAGE_SIG)),
JWK(**self._get_key(token.jose_header, KEY_USAGE_ENC))]
# verify token and get payload
token.verify(self.client_keys[KEY_USAGE_SIG])
claims = json_decode(token.payload)
except Exception as e:
logger.debug('Failed to validate message', exc_info=True)
alg = parsed_header.get('alg')
if alg is None:
raise _JWTError('alg header not present')
if alg not in allowed_algs:
raise _JWTError('algorithm not allowed: ' + alg)
if not ignore_not_implemented:
for k in parsed_header:
if k not in JWSHeaderRegistry:
raise _JWTError('unknown header: ' + k)
if not JWSHeaderRegistry[k].supported:
raise _JWTError('header not implemented: ' + k)
if pub_key:
token = JWS()
token.allowed_algs = allowed_algs
token.deserialize(jwt, pub_key)
elif 'none' not in allowed_algs:
raise _JWTError('no key but none alg not allowed')
parsed_claims = json_decode(base64url_decode(claims))
utcnow = datetime.utcnow()
now = timegm(utcnow.utctimetuple())
typ = parsed_header.get('typ')
if typ is None:
if not checks_optional:
raise _JWTError('typ header not present')
elif typ != 'JWT':
raise _JWTError('typ header is not JWT')