How to use the authlib.common.encoding.to_bytes function in Authlib

To help you get started, we’ve selected a few Authlib examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github lepture / authlib / authlib / oauth1 / rfc5849 / signature.py View on Github external
# key is set to the concatenated values of:
    # 1.  The client shared-secret, after being encoded (`Section 3.6`_).
    #
    # .. _`Section 3.6`: https://tools.ietf.org/html/rfc5849#section-3.6
    key = escape(client_secret or '')

    # 2.  An "&" character (ASCII code 38), which MUST be included
    #     even when either secret is empty.
    key += '&'

    # 3.  The token shared-secret, after being encoded (`Section 3.6`_).
    #
    # .. _`Section 3.6`: https://tools.ietf.org/html/rfc5849#section-3.6
    key += escape(token_secret or '')

    signature = hmac.new(to_bytes(key), to_bytes(text), hashlib.sha1)

    # digest  is used to set the value of the "oauth_signature" protocol
    #         parameter, after the result octet string is base64-encoded
    #         per `RFC2045, Section 6.8`.
    #
    # .. _`RFC2045, Section 6.8`: https://tools.ietf.org/html/rfc2045#section-6.8
    sig = binascii.b2a_base64(signature.digest())[:-1]
    return to_unicode(sig)
github lepture / authlib / authlib / jose / rfc7515 / jws.py View on Github external
if not signature_segment:
            raise DecodeError('Missing "signature" value')

        protected_segment = to_bytes(protected_segment)
        protected = _extract_header(protected_segment)
        header = header_obj.get('header')
        if header and not isinstance(header, dict):
            raise DecodeError('Invalid "header" value')
        jws_header = JWSHeader(protected, header)

        self._validate_header(jws_header)

        algorithm, key = prepare_algorithm_key(
            self._algorithms, jws_header, payload, key)
        signing_input = b'.'.join([protected_segment, payload_segment])
        signature = _extract_signature(to_bytes(signature_segment))
        if algorithm.verify(signing_input, key, signature):
            return jws_header, True
        return jws_header, False
github lepture / authlib / authlib / oidc / core / claims.py View on Github external
def _verify_hash(signature, s, alg):
    hash_value = create_half_hash(s, alg)
    if not hash_value:
        return True
    return hmac.compare_digest(hash_value, to_bytes(signature))
github lepture / authlib / authlib / common / urls.py View on Github external
def url_encode(params):
    encoded = []
    for k, v in params:
        encoded.append((to_bytes(k), to_bytes(v)))
    return to_unicode(_urlencode(encoded))
github lepture / authlib / authlib / oauth1 / rfc5849 / signature.py View on Github external
def verify_rsa_sha1(request):
    """Verify a RSASSA-PKCS #1 v1.5 base64 encoded signature."""
    from .rsa import verify_sha1
    base_string = generate_signature_base_string(request)
    sig = binascii.a2b_base64(to_bytes(request.signature))
    return verify_sha1(sig, to_bytes(base_string), request.rsa_public_key)
github lepture / authlib / authlib / jose / rfc7516 / jwe.py View on Github external
def _zip_compress(self, s, header):
        s = to_bytes(s)
        if 'zip' in header:
            zip_alg = self._zip_algorithms[header['zip']]
            return zip_alg.compress(s)
        return s
github lepture / authlib / authlib / common / urls.py View on Github external
def quote(s, safe=b'/'):
    return to_unicode(_quote(to_bytes(s), safe))
github lepture / authlib / authlib / jose / rfc7519 / jwt.py View on Github external
:param check: check if sensitive data in payload
        :return: bytes
        """
        header['typ'] = 'JWT'

        for k in ['exp', 'iat', 'nbf']:
            # convert datetime into timestamp
            claim = payload.get(k)
            if isinstance(claim, datetime.datetime):
                payload[k] = calendar.timegm(claim.utctimetuple())

        if check:
            self.check_sensitive_data(payload)

        key = load_key(key, header, payload)
        text = to_bytes(json_dumps(payload))
        if 'enc' in header:
            return self._jwe.serialize_compact(header, text, key)
        else:
            return self._jws.serialize_compact(header, text, key)