Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
pub_pem = "-----BEGIN PUBLIC KEY----- \n\
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA4qiw8PWs7PpnnC2BUEoD\n\
RcwXF8pq8XT1/3Hc3cuUJwX/otNefr/Bomr3dtM0ERLN3DrepCXvuzEU5FcJVDUB\n\
3sI+pFtjjLBXD/zJmuL3Afg91J9p79+Dm+43cR6wuKywVJx5DJIdswF6oQDDzhwu\n\
89d2V5x02aXB9LqdXkPwiO0eR5s/xHXgASl+hqDdVL9hLod3iGa9nV7cElCbcl8U\n\
VXNPJnQAfaiKazF+hCdl/syrIh0KCZ5opggsTJibo8qFXBmG4PkT5YbhHE11wYKI\n\
LwZFSvZ9iddRPQK3CtgFiBnXbVwU5t67tn9pMizHgypgsfBoeoyBrpTuc4egSCpj\n\
sQIDAQAB \n\
-----END PUBLIC KEY-----"
priv_key = JWK.from_pem(to_bytes_2and3(priv_pem))
pub_key = JWK.from_pem(to_bytes_2and3(pub_pem))
priv_keys = {
'HS256': {'default': JWK(kty='oct', k=base64url_encode('some random key'))},
'HS384': {'default': JWK(kty='oct', k=base64url_encode('another one'))},
'HS512': {'default': JWK(kty='oct', k=base64url_encode('keys keys keys!'))},
'RS256': {'python-jwt': priv_key},
'RS384': {'python-jwt': priv_key},
'RS512': {'python-jwt': priv_key},
'PS256': {'python-jwt': priv_key},
'PS384': {'python-jwt': priv_key},
'PS512': {'python-jwt': priv_key}
}
pub_keys = {
'HS256': {'default': priv_keys['HS256']['default']},
'HS384': {'default': priv_keys['HS384']['default']},
'HS512': {'default': priv_keys['HS512']['default']},
'RS256': {'python-jwt': pub_key},
'RS384': {'python-jwt': pub_key},
'RS512': {'python-jwt': pub_key},
'HS384': {'default': priv_keys['HS384']['default']},
'HS512': {'default': priv_keys['HS512']['default']},
'RS256': {'python-jwt': pub_key},
'RS384': {'python-jwt': pub_key},
'RS512': {'python-jwt': pub_key},
'PS256': {'python-jwt': pub_key},
'PS384': {'python-jwt': pub_key},
'PS512': {'python-jwt': pub_key}
}
generated_key = JWK.generate(kty='RSA', size=2048)
generated_keys = {
'HS256': JWK(kty='oct', k=base64url_encode(hexlify(urandom(16)))),
'HS384': JWK(kty='oct', k=base64url_encode(hexlify(urandom(16)))),
'HS512': JWK(kty='oct', k=base64url_encode(hexlify(urandom(16)))),
'RS256': generated_key,
'RS384': generated_key,
'RS512': generated_key,
'PS256': generated_key,
'PS384': generated_key,
'PS512': generated_key
}
algs = list(priv_keys.keys())
def _save_legacy_state_data(self, user_id, data):
protected_header = {
'alg': 'dir',
'enc': 'A256GCM',
'kid': '1,1',
}
jwe_token = jwe.JWE(
plaintext=base64url_encode(data),
protected=protected_header,
recipient=self.storage.encrypter.key
)
legacy_state_data = json.dumps({'data': jwe_token.serialize(compact=True)})
questionnaire_state = QuestionnaireState(
user_id,
legacy_state_data,
self.LEGACY_DATA_STORE_VERSION
)
data_access.put(questionnaire_state)
""" test using PEM as key - we shouldn't validate a HMAC token instead """
# pylint: disable=wrong-import-order
from test.common import payload, pub_pem, pub_key
from test import python_jwt as jwt
from datetime import timedelta
from pyvows import Vows, expect
from jwcrypto.jwk import JWK
from jwcrypto.common import base64url_encode
pem_key = JWK(kty='oct', k=base64url_encode(pub_pem))
@Vows.batch
class PEMAsHMACKey(Vows.Context):
""" setup tests """
def topic(self):
""" Generate token """
return jwt.generate_jwt(payload, pem_key, 'HS256', timedelta(seconds=60))
class VerifyTokenUsingPublicPEMNoAllowedAlgsSpecified(Vows.Context):
""" Verify token, allowed algorithms not specified """
@Vows.capture_error
def topic(self, topic):
""" Verify the token """
return jwt.verify_jwt(topic, pem_key)
def token_should_not_verify(self, r):
claims['iat'] = timegm(now.utctimetuple())
if lifetime:
claims['exp'] = timegm((now + lifetime).utctimetuple())
elif expires:
claims['exp'] = timegm(expires.utctimetuple())
if header['alg'] == 'none':
signature = ''
else:
token = JWS(json_encode(claims))
token.add_signature(priv_key, protected=header)
signature = json_decode(token.serialize())['signature']
return u'%s.%s.%s' % (
base64url_encode(json_encode(header)),
base64url_encode(json_encode(claims)),
signature
)
'typ': 'JWT',
'alg': algorithm if priv_key else 'none'
}
if other_headers is not None:
redefined_keys = set(header.keys()) & set(other_headers.keys())
if redefined_keys:
raise ValueError('other_headers re-specified the headers: {}'.format(', '.join(redefined_keys)))
header.update(other_headers)
claims = dict(claims)
now = datetime.utcnow()
if jti_size:
claims['jti'] = base64url_encode(urandom(jti_size))
claims['nbf'] = timegm((not_before or now).utctimetuple())
claims['iat'] = timegm(now.utctimetuple())
if lifetime:
claims['exp'] = timegm((now + lifetime).utctimetuple())
elif expires:
claims['exp'] = timegm(expires.utctimetuple())
if header['alg'] == 'none':
signature = ''
else:
token = JWS(json_encode(claims))
token.add_signature(priv_key, protected=header)
signature = json_decode(token.serialize())['signature']
def _generate_key(user_id, user_ik, pepper):
sha256 = hashlib.sha256()
sha256.update(to_str(user_id).encode('utf-8'))
sha256.update(to_str(user_ik).encode('utf-8'))
sha256.update(to_str(pepper).encode('utf-8'))
# we only need the first 32 characters for the CEK
cek = to_bytes(sha256.hexdigest()[:32])
password = {
'kty': 'oct',
'k': base64url_encode(cek),
}
return jwk.JWK(**password)
def _encode_int(self, i):
I = hex(i).rstrip("L").lstrip("0x")
return base64url_encode(unhexlify((len(I) % 2) * '0' + I))