Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
{'uid': serviceuid, 'exp': time.time() + 30},
default_rsa_privkey,
algorithm='RS256'
).decode('ascii')
# Log in via the service login token.
r = noauth_api_session.post(
'/acs/api/v1/auth/login',
json={'uid': serviceuid, 'token': service_login_token}
)
assert r.status_code == 200, r.text
# Confirm that the response body contains a DC/OS authentication token.
token = r.json()['token']
header_bytes, payload_bytes, signature_bytes = [
base64url_decode(_.encode('ascii')) for _ in token.split(".")]
header_dict = json.loads(header_bytes.decode('ascii'))
assert header_dict['alg'] == 'RS256'
assert header_dict['typ'] == 'JWT'
payload_dict = json.loads(payload_bytes.decode('ascii'))
assert 'exp' in payload_dict
assert 'uid' in payload_dict
assert payload_dict['uid'] == serviceuid
# Verify that the service user account appears in the users collection.
r = dcos_api_session.get('/acs/api/v1/users', query='type=service')
uids = [o['uid'] for o in r.json()['array']]
assert serviceuid in uids
# Delete the service user account.
def test_encode_headers_parameter_adds_headers(self, jws, payload):
headers = {"testheader": True}
token = jws.encode(payload, "secret", headers=headers)
if not isinstance(token, str):
token = token.decode()
header = token[0 : token.index(".")].encode()
header = base64url_decode(header)
if not isinstance(header, str):
header = header.decode()
header_obj = json.loads(header)
assert "testheader" in header_obj
assert header_obj["testheader"] == headers["testheader"]
def test_rsapss_verify_should_return_true_for_test_vector(self):
"""
This test verifies that RSA-PSS verification works with a known good
signature and key.
Reference: https://tools.ietf.org/html/rfc7520#section-4.2
"""
signing_input = force_bytes(
"eyJhbGciOiJQUzM4NCIsImtpZCI6ImJpbGJvLmJhZ2dpbnNAaG9iYml0b24uZXhhb"
"XBsZSJ9.SXTigJlzIGEgZGFuZ2Vyb3VzIGJ1c2luZXNzLCBGcm9kbywgZ29pbmcgb"
"3V0IHlvdXIgZG9vci4gWW91IHN0ZXAgb250byB0aGUgcm9hZCwgYW5kIGlmIHlvdS"
"Bkb24ndCBrZWVwIHlvdXIgZmVldCwgdGhlcmXigJlzIG5vIGtub3dpbmcgd2hlcmU"
"geW91IG1pZ2h0IGJlIHN3ZXB0IG9mZiB0by4"
)
signature = base64url_decode(
force_bytes(
"cu22eBqkYDKgIlTpzDXGvaFfz6WGoz7fUDcfT0kkOy42miAh2qyBzk1xEsnk2IpN6"
"-tPid6VrklHkqsGqDqHCdP6O8TTB5dDDItllVo6_1OLPpcbUrhiUSMxbbXUvdvWXz"
"g-UD8biiReQFlfz28zGWVsdiNAUf8ZnyPEgVFn442ZdNqiVJRmBqrYRXe8P_ijQ7p"
"8Vdz0TTrxUeT3lm8d9shnr2lfJT8ImUjvAA2Xez2Mlp8cBE5awDzT0qI0n6uiP1aC"
"N_2_jLAeQTlqRHtfa64QQSUmFAAjVKPbByi7xho0uTOcbH510a6GYmJUAfmWjwZ6o"
"D4ifKo8DYM-X72Eaw"
)
)
algo = RSAPSSAlgorithm(RSAPSSAlgorithm.SHA384)
key = algo.prepare_key(load_rsa_pub_key())
result = algo.verify(signing_input, key, signature)
assert result
def user_data(self, access_token, *args, **kwargs):
response = kwargs.get('response')
id_token = response.get('id_token')
if six.PY2:
# str() to fix a bug in Python's base64
# https://stackoverflow.com/a/2230623/161278
id_token = str(id_token)
jwt_header_json = base64url_decode(id_token.split('.')[0])
jwt_header = json.loads(jwt_header_json.decode('ascii'))
# `kid` is short for key id
key = self.get_public_key(jwt_header['kid'])
try:
return jwt_decode(
id_token,
key=key,
algorithms=jwt_header['alg'],
audience=self.setting('KEY'),
leeway=self.setting('JWT_LEEWAY', default=0),
)
except (DecodeError, ExpiredSignature) as error:
raise AuthTokenError(self, error)
def fetch_token_header(token):
"""
Fetch the header out of the JWT token.
:param token:
:return: :raise jwt.DecodeError:
"""
token = token.encode("utf-8")
try:
signing_input, crypto_segment = token.rsplit(b".", 1)
header_segment, payload_segment = signing_input.split(b".", 1)
except ValueError:
raise jwt.DecodeError("Not enough segments")
try:
return json.loads(jwt.utils.base64url_decode(header_segment).decode("utf-8"))
except TypeError as e:
current_app.logger.exception(e)
raise jwt.DecodeError("Invalid header padding")
# which is reachable via the local network interface.
r = requests.get('http://127.0.0.1:8101/acs/api/v1/auth/jwks')
if r.status_code != 200:
log.info('JWKS retrieval failed. Got %s with body: %s', r, r.text)
sys.exit(1)
jwks = r.json()
# The first key in the JSON Web Key Set corresponds to the current private
# key used for signing authentiction tokens.
key = jwks['keys'][0]
exponent_bytes = base64url_decode(key['e'].encode('ascii'))
exponent_int = bytes_to_number(exponent_bytes)
modulus_bytes = base64url_decode(key['n'].encode('ascii'))
modulus_int = bytes_to_number(modulus_bytes)
# Generate a `cryptography` public key object instance from these numbers.
public_numbers = rsa.RSAPublicNumbers(n=modulus_int, e=exponent_int)
public_key = public_numbers.public_key(
backend=cryptography.hazmat.backends.default_backend())
# Serialize public key into the OpenSSL PEM public key format RFC 5280).
pubkey_pem_bytes = public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo)
rundir = utils.dcos_run_path / 'dcos-adminrouter'
rundir.mkdir(parents=True, exist_ok=True)
pubkey_path = rundir / 'auth-token-verification-key'
utils.write_public_file(pubkey_path, pubkey_pem_bytes)
utils.chown(pubkey_path, user='dcos_adminrouter')
def _payload(self):
if self._signatures is None:
return self._bytes.as_encoded_str()
byte_data = self._bytes.as_encoded_str()
protected = str(self._signatures[0][DOCKER_SCHEMA1_PROTECTED_KEY])
parsed_protected = json.loads(base64url_decode(protected))
signed_content_head = byte_data[: parsed_protected[DOCKER_SCHEMA1_FORMAT_LENGTH_KEY]]
signed_content_tail = base64url_decode(
str(parsed_protected[DOCKER_SCHEMA1_FORMAT_TAIL_KEY])
)
return signed_content_head + signed_content_tail
def from_jwk(jwk):
obj = json.loads(jwk)
if obj.get("kty") != "oct":
raise InvalidKeyError("Not an HMAC key")
return base64url_decode(obj["k"])
try:
header = json.loads(header_data.decode("utf-8"))
except ValueError as e:
raise DecodeError("Invalid header string: %s" % e)
if not isinstance(header, Mapping):
raise DecodeError("Invalid header string: must be a json object")
try:
payload = base64url_decode(payload_segment)
except (TypeError, binascii.Error):
raise DecodeError("Invalid payload padding")
try:
signature = base64url_decode(crypto_segment)
except (TypeError, binascii.Error):
raise DecodeError("Invalid crypto padding")
return (payload, signing_input, header, signature)
def get_rsa_public_key(n, e):
"""
Retrieve an RSA public key based on a module and exponent as provided by the JWKS format.
:param n:
:param e:
:return: a RSA Public Key in PEM format
"""
n = int(binascii.hexlify(jwt.utils.base64url_decode(bytes(n))), 16)
e = int(binascii.hexlify(jwt.utils.base64url_decode(bytes(e))), 16)
pub = RSAPublicNumbers(e, n).public_key(default_backend())
return pub.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)