How to use the authlib.jose.errors.DecodeError function in Authlib

To help you get started, we’ve selected a few Authlib examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github lepture / authlib / tests / core / test_jose / test_jws.py View on Github external
def test_invalid_input(self):
        jws = JsonWebSignature(algorithms=JWS_ALGORITHMS)
        self.assertRaises(errors.DecodeError, jws.deserialize, 'a', 'k')
        self.assertRaises(errors.DecodeError, jws.deserialize, 'a.b.c', 'k')
        self.assertRaises(
            errors.DecodeError, jws.deserialize, 'YQ.YQ.YQ', 'k')  # a
        self.assertRaises(
            errors.DecodeError, jws.deserialize, 'W10.a.YQ', 'k')  # []
        self.assertRaises(
            errors.DecodeError, jws.deserialize, 'e30.a.YQ', 'k')  # {}
        self.assertRaises(
            errors.DecodeError, jws.deserialize, 'eyJhbGciOiJzIn0.a.YQ', 'k')
        self.assertRaises(
            errors.DecodeError, jws.deserialize, 'eyJhbGciOiJzIn0.YQ.a', 'k')
github lepture / authlib / tests / core / test_jose / test_jwe.py View on Github external
def test_not_enough_segments(self):
        s = 'a.b.c'
        jwe = JsonWebEncryption(algorithms=JWE_ALGORITHMS)
        self.assertRaises(
            errors.DecodeError,
            jwe.deserialize_compact,
            s, None
        )
github lepture / authlib / authlib / jose / rfc7516 / jwe.py View on Github external
def deserialize_compact(self, s, key, decode=None):
        """Exact JWS Compact Serialization, and validate with the given key.

        :param s: text of JWS Compact Serialization
        :param key: key used to verify the signature
        :param decode: a function to decode plaintext data
        :return: dict
        """
        try:
            s = to_bytes(s)
            protected_s, ek_s, iv_s, ciphertext_s, tag_s = s.rsplit(b'.')
        except ValueError:
            raise DecodeError('Not enough segments')

        protected = extract_header(protected_s, DecodeError)
        ek = extract_segment(ek_s, DecodeError, 'encryption key')
        iv = extract_segment(iv_s, DecodeError, 'initialization vector')
        ciphertext = extract_segment(ciphertext_s, DecodeError, 'ciphertext')
        tag = extract_segment(tag_s, DecodeError, 'authentication tag')

        self._pre_validate_header(protected)
        algorithm, enc_alg, key = self._prepare_alg_enc_key(
            protected, key, private=True)
        self._post_validate_header(protected, algorithm)

        cek = algorithm.unwrap(ek, protected, key)
        aad = to_bytes(protected_s, 'ascii')
        msg = enc_alg.decrypt(ciphertext, aad, iv, tag, cek)
github lepture / authlib / authlib / jose / rfc7515 / jws.py View on Github external
and signing input values. Via `Section 7.1`_.

        :param s: text of JWS Compact Serialization
        :param key: key used to verify the signature
        :param decode: a function to decode payload data
        :return: JWSObject
        :raise: BadSignatureError

        .. _`Section 7.1`: https://tools.ietf.org/html/rfc7515#section-7.1
        """
        try:
            s = to_bytes(s)
            signing_input, signature_segment = s.rsplit(b'.', 1)
            protected_segment, payload_segment = signing_input.split(b'.', 1)
        except ValueError:
            raise DecodeError('Not enough segments')

        protected = _extract_header(protected_segment)
        jws_header = JWSHeader(protected, None)

        payload = _extract_payload(payload_segment)
        if decode:
            payload = decode(payload)

        signature = _extract_signature(signature_segment)

        self._validate_header(jws_header)

        rv = JWSObject(jws_header, payload, 'compact')
        algorithm, key = prepare_algorithm_key(
            self._algorithms, jws_header, payload, key)
        if algorithm.verify(signing_input, key, signature):
github lepture / authlib / authlib / jose / rfc7515 / jws.py View on Github external
def _validate_json_jws(self, payload_segment, payload, header_obj, key):
        protected_segment = header_obj.get('protected')
        if not protected_segment:
            raise DecodeError('Missing "protected" value')

        signature_segment = header_obj.get('signature')
        if not signature_segment:
            raise DecodeError('Missing "signature" value')

        protected_segment = to_bytes(protected_segment)
        protected = _extract_header(protected_segment)
        header = header_obj.get('header')
        if header and not isinstance(header, dict):
            raise DecodeError('Invalid "header" value')
        jws_header = JWSHeader(protected, header)

        self._validate_header(jws_header)

        algorithm, key = prepare_algorithm_key(
            self._algorithms, jws_header, payload, key)
        signing_input = b'.'.join([protected_segment, payload_segment])
        signature = _extract_signature(to_bytes(signature_segment))
        if algorithm.verify(signing_input, key, signature):
            return jws_header, True
        return jws_header, False
github lepture / authlib / authlib / jose / rfc7515 / jws.py View on Github external
def _ensure_dict(s):
    if not isinstance(s, dict):
        try:
            s = json_loads(to_unicode(s))
        except (ValueError, TypeError):
            raise DecodeError('Invalid JWS')

    if not isinstance(s, dict):
        raise DecodeError('Invalid JWS')

    return s
github lepture / authlib / authlib / jose / rfc7519 / jwt.py View on Github external
def decode_payload(bytes_payload):
    try:
        payload = json_loads(to_unicode(bytes_payload))
    except ValueError:
        raise DecodeError('Invalid payload value')
    if not isinstance(payload, dict):
        raise DecodeError('Invalid payload type')
    return payload
github lepture / authlib / authlib / jose / rfc7515 / jws.py View on Github external
def _validate_json_jws(self, payload_segment, payload, header_obj, key):
        protected_segment = header_obj.get('protected')
        if not protected_segment:
            raise DecodeError('Missing "protected" value')

        signature_segment = header_obj.get('signature')
        if not signature_segment:
            raise DecodeError('Missing "signature" value')

        protected_segment = to_bytes(protected_segment)
        protected = _extract_header(protected_segment)
        header = header_obj.get('header')
        if header and not isinstance(header, dict):
            raise DecodeError('Invalid "header" value')
        jws_header = JWSHeader(protected, header)

        self._validate_header(jws_header)

        algorithm, key = prepare_algorithm_key(
            self._algorithms, jws_header, payload, key)
github lepture / authlib / authlib / jose / rfc7515 / jws.py View on Github external
If key is not provided, it will return a dict without signature
        verification. Header will still be validated. Via `Section 7.2`_.

        :param obj: text of JWS JSON Serialization
        :param key: key used to verify the signature
        :param decode: a function to decode payload data
        :return: JWSObject
        :raise: BadSignatureError

        .. _`Section 7.2`: https://tools.ietf.org/html/rfc7515#section-7.2
        """
        obj = _ensure_dict(obj)

        payload_segment = obj.get('payload')
        if not payload_segment:
            raise DecodeError('Missing "payload" value')

        payload_segment = to_bytes(payload_segment)
        payload = _extract_payload(payload_segment)
        if decode:
            payload = decode(payload)

        if 'signatures' not in obj:
            # flattened JSON JWS
            jws_header, valid = self._validate_json_jws(
                payload_segment, payload, obj, key)

            rv = JWSObject(jws_header, payload, 'flat')
            if valid:
                return rv
            raise BadSignatureError(rv)
github lepture / authlib / authlib / jose / rfc7515 / jws.py View on Github external
def _validate_json_jws(self, payload_segment, payload, header_obj, key):
        protected_segment = header_obj.get('protected')
        if not protected_segment:
            raise DecodeError('Missing "protected" value')

        signature_segment = header_obj.get('signature')
        if not signature_segment:
            raise DecodeError('Missing "signature" value')

        protected_segment = to_bytes(protected_segment)
        protected = _extract_header(protected_segment)
        header = header_obj.get('header')
        if header and not isinstance(header, dict):
            raise DecodeError('Invalid "header" value')
        jws_header = JWSHeader(protected, header)

        self._validate_header(jws_header)

        algorithm, key = prepare_algorithm_key(
            self._algorithms, jws_header, payload, key)
        signing_input = b'.'.join([protected_segment, payload_segment])
        signature = _extract_signature(to_bytes(signature_segment))
        if algorithm.verify(signing_input, key, signature):
            return jws_header, True