Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_invalid_input(self):
jws = JsonWebSignature(algorithms=JWS_ALGORITHMS)
self.assertRaises(errors.DecodeError, jws.deserialize, 'a', 'k')
self.assertRaises(errors.DecodeError, jws.deserialize, 'a.b.c', 'k')
self.assertRaises(
errors.DecodeError, jws.deserialize, 'YQ.YQ.YQ', 'k') # a
self.assertRaises(
errors.DecodeError, jws.deserialize, 'W10.a.YQ', 'k') # []
self.assertRaises(
errors.DecodeError, jws.deserialize, 'e30.a.YQ', 'k') # {}
self.assertRaises(
errors.DecodeError, jws.deserialize, 'eyJhbGciOiJzIn0.a.YQ', 'k')
self.assertRaises(
errors.DecodeError, jws.deserialize, 'eyJhbGciOiJzIn0.YQ.a', 'k')
def test_not_enough_segments(self):
s = 'a.b.c'
jwe = JsonWebEncryption(algorithms=JWE_ALGORITHMS)
self.assertRaises(
errors.DecodeError,
jwe.deserialize_compact,
s, None
)
def deserialize_compact(self, s, key, decode=None):
"""Exact JWS Compact Serialization, and validate with the given key.
:param s: text of JWS Compact Serialization
:param key: key used to verify the signature
:param decode: a function to decode plaintext data
:return: dict
"""
try:
s = to_bytes(s)
protected_s, ek_s, iv_s, ciphertext_s, tag_s = s.rsplit(b'.')
except ValueError:
raise DecodeError('Not enough segments')
protected = extract_header(protected_s, DecodeError)
ek = extract_segment(ek_s, DecodeError, 'encryption key')
iv = extract_segment(iv_s, DecodeError, 'initialization vector')
ciphertext = extract_segment(ciphertext_s, DecodeError, 'ciphertext')
tag = extract_segment(tag_s, DecodeError, 'authentication tag')
self._pre_validate_header(protected)
algorithm, enc_alg, key = self._prepare_alg_enc_key(
protected, key, private=True)
self._post_validate_header(protected, algorithm)
cek = algorithm.unwrap(ek, protected, key)
aad = to_bytes(protected_s, 'ascii')
msg = enc_alg.decrypt(ciphertext, aad, iv, tag, cek)
and signing input values. Via `Section 7.1`_.
:param s: text of JWS Compact Serialization
:param key: key used to verify the signature
:param decode: a function to decode payload data
:return: JWSObject
:raise: BadSignatureError
.. _`Section 7.1`: https://tools.ietf.org/html/rfc7515#section-7.1
"""
try:
s = to_bytes(s)
signing_input, signature_segment = s.rsplit(b'.', 1)
protected_segment, payload_segment = signing_input.split(b'.', 1)
except ValueError:
raise DecodeError('Not enough segments')
protected = _extract_header(protected_segment)
jws_header = JWSHeader(protected, None)
payload = _extract_payload(payload_segment)
if decode:
payload = decode(payload)
signature = _extract_signature(signature_segment)
self._validate_header(jws_header)
rv = JWSObject(jws_header, payload, 'compact')
algorithm, key = prepare_algorithm_key(
self._algorithms, jws_header, payload, key)
if algorithm.verify(signing_input, key, signature):
def _validate_json_jws(self, payload_segment, payload, header_obj, key):
protected_segment = header_obj.get('protected')
if not protected_segment:
raise DecodeError('Missing "protected" value')
signature_segment = header_obj.get('signature')
if not signature_segment:
raise DecodeError('Missing "signature" value')
protected_segment = to_bytes(protected_segment)
protected = _extract_header(protected_segment)
header = header_obj.get('header')
if header and not isinstance(header, dict):
raise DecodeError('Invalid "header" value')
jws_header = JWSHeader(protected, header)
self._validate_header(jws_header)
algorithm, key = prepare_algorithm_key(
self._algorithms, jws_header, payload, key)
signing_input = b'.'.join([protected_segment, payload_segment])
signature = _extract_signature(to_bytes(signature_segment))
if algorithm.verify(signing_input, key, signature):
return jws_header, True
return jws_header, False
def _ensure_dict(s):
if not isinstance(s, dict):
try:
s = json_loads(to_unicode(s))
except (ValueError, TypeError):
raise DecodeError('Invalid JWS')
if not isinstance(s, dict):
raise DecodeError('Invalid JWS')
return s
def decode_payload(bytes_payload):
try:
payload = json_loads(to_unicode(bytes_payload))
except ValueError:
raise DecodeError('Invalid payload value')
if not isinstance(payload, dict):
raise DecodeError('Invalid payload type')
return payload
def _validate_json_jws(self, payload_segment, payload, header_obj, key):
protected_segment = header_obj.get('protected')
if not protected_segment:
raise DecodeError('Missing "protected" value')
signature_segment = header_obj.get('signature')
if not signature_segment:
raise DecodeError('Missing "signature" value')
protected_segment = to_bytes(protected_segment)
protected = _extract_header(protected_segment)
header = header_obj.get('header')
if header and not isinstance(header, dict):
raise DecodeError('Invalid "header" value')
jws_header = JWSHeader(protected, header)
self._validate_header(jws_header)
algorithm, key = prepare_algorithm_key(
self._algorithms, jws_header, payload, key)
If key is not provided, it will return a dict without signature
verification. Header will still be validated. Via `Section 7.2`_.
:param obj: text of JWS JSON Serialization
:param key: key used to verify the signature
:param decode: a function to decode payload data
:return: JWSObject
:raise: BadSignatureError
.. _`Section 7.2`: https://tools.ietf.org/html/rfc7515#section-7.2
"""
obj = _ensure_dict(obj)
payload_segment = obj.get('payload')
if not payload_segment:
raise DecodeError('Missing "payload" value')
payload_segment = to_bytes(payload_segment)
payload = _extract_payload(payload_segment)
if decode:
payload = decode(payload)
if 'signatures' not in obj:
# flattened JSON JWS
jws_header, valid = self._validate_json_jws(
payload_segment, payload, obj, key)
rv = JWSObject(jws_header, payload, 'flat')
if valid:
return rv
raise BadSignatureError(rv)
def _validate_json_jws(self, payload_segment, payload, header_obj, key):
protected_segment = header_obj.get('protected')
if not protected_segment:
raise DecodeError('Missing "protected" value')
signature_segment = header_obj.get('signature')
if not signature_segment:
raise DecodeError('Missing "signature" value')
protected_segment = to_bytes(protected_segment)
protected = _extract_header(protected_segment)
header = header_obj.get('header')
if header and not isinstance(header, dict):
raise DecodeError('Invalid "header" value')
jws_header = JWSHeader(protected, header)
self._validate_header(jws_header)
algorithm, key = prepare_algorithm_key(
self._algorithms, jws_header, payload, key)
signing_input = b'.'.join([protected_segment, payload_segment])
signature = _extract_signature(to_bytes(signature_segment))
if algorithm.verify(signing_input, key, signature):
return jws_header, True