Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
signing_key = load_signing_key(signing_key, self.crypto_backend)
# prepare header
header = {'typ': self.token_type, 'alg': self.signing_algorithm}
token_segments.append(base64url_encode(json_encode(header)))
# prepare payload
token_segments.append(base64url_encode(json_encode(payload)))
# prepare signature
signing_input = b'.'.join(token_segments)
signer = self._get_signer(signing_key)
signer.update(signing_input)
signature = signer.finalize()
raw_signature = der_to_raw_signature(signature, signing_key.curve)
token_segments.append(base64url_encode(raw_signature))
# combine the header, payload, and signature into a token and return it
token = b'.'.join(token_segments)
return token
payload = dict(
type=type,
jti=uuid.uuid4(),
identity=identity,
nbf=now,
iat=now,
exp=now + exp
)
for key, value in claims.items():
payload.update({key: value})
header = utils.base64url_encode(str(default_header).encode())
payload = utils.base64url_encode(str(payload).encode())
signature = make_signature(header, payload, key)
signature= utils.base64url_encode(signature)
token = header + b'.' + payload + b'.' + signature
return token
if algorithm not in self._valid_algs:
pass
# Header
header = {"typ": self.header_typ, "alg": algorithm}
if headers:
self._validate_headers(headers)
header.update(headers)
json_header = force_bytes(
json.dumps(header, separators=(",", ":"), cls=json_encoder)
)
segments.append(base64url_encode(json_header))
segments.append(base64url_encode(payload))
# Segments
signing_input = b".".join(segments)
try:
alg_obj = self._algorithms[algorithm]
key = alg_obj.prepare_key(key)
signature = alg_obj.sign(signing_input, key)
except KeyError:
if not has_crypto and algorithm in requires_cryptography:
raise NotImplementedError(
"Algorithm '%s' could not be found. Do you have cryptography "
"installed?" % algorithm
)
else:
raise NotImplementedError("Algorithm not supported")
def create_token(type, identity, exp, key, **claims):
now = int(time.time())
exp = int(exp.total_seconds())
payload = dict(
type=type,
jti=uuid.uuid4(),
identity=identity,
nbf=now,
iat=now,
exp=now + exp
)
for key, value in claims.items():
payload.update({key: value})
header = utils.base64url_encode(str(default_header).encode())
payload = utils.base64url_encode(str(payload).encode())
signature = make_signature(header, payload, key)
signature= utils.base64url_encode(signature)
token = header + b'.' + payload + b'.' + signature
return token
def _validate(self):
if not self._signatures:
return
payload_str = self._payload
for signature in self._signatures:
bytes_to_verify = b"%s.%s" % (
Bytes.for_string_or_unicode(signature["protected"]).as_encoded_str(),
base64url_encode(payload_str),
)
signer = SIGNER_ALGS[signature["header"]["alg"]]
key = keyrep(signature["header"]["jwk"])
gk = key.get_key()
sig = base64url_decode(signature["signature"].encode("utf-8"))
try:
verified = signer.verify(bytes_to_verify, sig, gk)
except BadSignature:
raise InvalidSchema1Signature()
if not verified:
raise InvalidSchema1Signature()
return DockerSchema1Manifest(Bytes.for_string_or_unicode(payload_str))
payload_str = Bytes.for_string_or_unicode(payload_str).as_encoded_str()
split_point = payload_str.rfind(b"\n}")
protected_payload = {
"formatTail": base64url_encode(payload_str[split_point:]).decode("ascii"),
"formatLength": split_point,
"time": datetime.utcnow().strftime(_ISO_DATETIME_FORMAT_ZULU),
}
protected = base64url_encode(
json.dumps(protected_payload, ensure_ascii=ensure_ascii).encode("utf-8")
)
logger.debug("Generated protected block: %s", protected)
bytes_to_sign = b"%s.%s" % (protected, base64url_encode(payload_str))
signer = SIGNER_ALGS[_JWS_SIGNING_ALGORITHM]
signature = base64url_encode(signer.sign(bytes_to_sign, json_web_key.get_key()))
logger.debug("Generated signature: %s", signature)
public_members = set(json_web_key.public_members)
public_key = {
comp: value
for comp, value in list(json_web_key.to_dict().items())
if comp in public_members
}
signature_block = {
DOCKER_SCHEMA1_HEADER_KEY: {"jwk": public_key, "alg": _JWS_SIGNING_ALGORITHM},
DOCKER_SCHEMA1_SIGNATURE_KEY: signature.decode("ascii"),
DOCKER_SCHEMA1_PROTECTED_KEY: protected.decode("ascii"),
now = int(time.time())
exp = int(exp.total_seconds())
payload = dict(
type=type,
jti=uuid.uuid4(),
identity=identity,
nbf=now,
iat=now,
exp=now + exp
)
for key, value in claims.items():
payload.update({key: value})
header = utils.base64url_encode(str(default_header).encode())
payload = utils.base64url_encode(str(payload).encode())
signature = make_signature(header, payload, key)
signature= utils.base64url_encode(signature)
token = header + b'.' + payload + b'.' + signature
return token
hash, as described in http://openid.net/specs/openid-connect-core-1_0.html#CodeIDToken
Its value is the base64url encoding of the left-most half of the hash of the octets
of the ASCII representation of the access_token value, where the hash algorithm
used is the hash algorithm used in the alg Header Parameter of the ID Token's JOSE
Header. For instance, if the alg is RS256, hash the access_token value with SHA-256,
then take the left-most 128 bits and base64url encode them. The at_hash value is a
case sensitive string.
Args:
access_token (str): An access token string.
hash_alg (callable): A callable returning a hash object, e.g. hashlib.sha256
"""
hash_digest = hash_alg(access_token.encode('utf-8')).digest()
cut_at = int(len(hash_digest) / 2)
truncated = hash_digest[:cut_at]
from jwt.utils import base64url_encode
at_hash = base64url_encode(truncated)
return at_hash.decode('utf-8')
}
)
payload_str = json.dumps(payload, indent=3, ensure_ascii=ensure_ascii)
if json_web_key is None:
return DockerSchema1Manifest(Bytes.for_string_or_unicode(payload_str))
payload_str = Bytes.for_string_or_unicode(payload_str).as_encoded_str()
split_point = payload_str.rfind(b"\n}")
protected_payload = {
"formatTail": base64url_encode(payload_str[split_point:]).decode("ascii"),
"formatLength": split_point,
"time": datetime.utcnow().strftime(_ISO_DATETIME_FORMAT_ZULU),
}
protected = base64url_encode(
json.dumps(protected_payload, ensure_ascii=ensure_ascii).encode("utf-8")
)
logger.debug("Generated protected block: %s", protected)
bytes_to_sign = b"%s.%s" % (protected, base64url_encode(payload_str))
signer = SIGNER_ALGS[_JWS_SIGNING_ALGORITHM]
signature = base64url_encode(signer.sign(bytes_to_sign, json_web_key.get_key()))
logger.debug("Generated signature: %s", signature)
public_members = set(json_web_key.public_members)
public_key = {
comp: value
for comp, value in list(json_web_key.to_dict().items())
if comp in public_members
}