Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_service_account_key(service_account_key):
request_func = get_auth_token_requester(service_account_key=service_account_key).get_token_request
request = request_func()
now = int(time.time())
headers = jwt.get_unverified_header(request.jwt)
parsed = jwt.decode(request.jwt, secret=service_account_key["public_key"], algorithms=['PS256'], verify=False)
assert headers["typ"] == "JWT"
assert headers["alg"] == "PS256"
assert headers["kid"] == service_account_key["id"]
assert parsed["iss"] == service_account_key["service_account_id"]
assert parsed["aud"] == "https://iam.api.cloud.yandex.net/iam/v1/tokens"
assert now - 60 <= int(parsed["iat"]) <= now
def assertJwtsEqual(self, jwt, key, expected_payload=None, expected_headers=None):
expected_headers = expected_headers or {}
expected_payload = expected_payload or {}
decoded_payload = jwt_lib.decode(jwt, key, verify=False)
decoded_headers = jwt_lib.get_unverified_header(jwt)
self.assertEqual(expected_headers, decoded_headers)
self.assertEqual(expected_payload, decoded_payload)
def validate(ssd):
try:
ssd_header = jwt.get_unverified_header(ssd)
jwt.decode(ssd, SFSsd.ret_ssd_pub_key(ssd_header['ssd_iss']),
algorithm='RS512')
except Exception as ex:
logger.debug("Error while validating SSD Token", ex)
return False
return True
def hs256_attack(args):
"""This function passes down different candidates to the run() function and is required
to handle different types of guessing attack.
Arguments:
args {object} -- The command-line arguments
"""
headers = jwt.get_unverified_header(args.token)
if not headers['alg'] == "HS256":
logging.error("JWT signed using an algorithm other than HS256.")
else:
tqdm_disable = True if args.log_level == "DEBUG" else False
if args.attack_mode == "brute-force":
# Count = ....
for candidate in tqdm(bruteforce(args.charset, args.increment_min, args.increment_max), disable=tqdm_disable):
if run(args.token, candidate):
return candidate
return None
elif args.attack_mode == "wordlist":
word_count = len(open(args.wordlist.name, "r",
def authorize(self, request, uid=None):
token = self._get_raw_token(request)
issuer_info = self._get_issuer_info()
unverified_headers = jwt.get_unverified_header(token)
key_id = unverified_headers.get('kid', None)
if key_id is None:
raise UnauthorizedException("Missing key id in token")
jwks_uri = issuer_info.get('jwks_uri')
if jwks_uri is None:
raise UnauthorizedException("Missing JWKS URI in config")
key, algo = self._get_signing_key(jwks_uri, key_id)
try:
claims = jwt.decode(token, key, algorithms=algo,
issuer=issuer_info['issuer'],
audience=self.config['audience'])
except Exception as e:
raise UnauthorizedException('Invalid access token: %s' % e)
if claims['preferred_username'] == self.config.get('admin_username',
'admin'):
return 'admin'
def _check_header_values(token):
header = jwt.get_unverified_header(token)
if not header:
raise InvalidTokenException("Missing Headers")
if not header.get('typ'):
raise InvalidTokenException("Missing Type")
if not header.get('alg'):
raise InvalidTokenException("Missing Algorithm")
if not header.get('kid'):
raise InvalidTokenException("Missing kid")
if header.get('typ').upper() != 'JWT':
raise InvalidTokenException("Invalid Type")
if header.get('alg').upper() != 'RS256':
raise InvalidTokenException("Invalid Algorithm")
if header.get('kid').upper() != 'EDCRRM':
raise InvalidTokenException("Invalid Key Identifier")
"""Get public key for JWT in order to verify the signature.
The keys get cached in order to limit the amount of network round trips.
Args:
encoded_jwt: Base64 encoded JWT.
url: URL where keys can be fetched.
Raises:
JwTKeyError if keys cannot be fetched.
Returns:
Key as string.
"""
# Get the Key ID from the JWT header.
key_id = jwt.get_unverified_header(encoded_jwt).get('kid')
if not key_id:
raise JwtKeyError('Missing key ID field in token header')
key_cache = get_public_key_for_jwt.key_cache
key = key_cache.get(key_id)
if key:
return key
# Re-fetch the key file.
keys_json = _fetch_public_keys(url)
if 'keys' in keys_json:
_new_keys_dict = {}
for key_dict in keys_json['keys']:
public_key = jwt.algorithms.RSAAlgorithm.from_jwk(
json.dumps(key_dict))
_new_keys_dict[key_dict['kid']] = public_key
key_cache = _new_keys_dict
def decode_token(encoded_token, csrf_value=None, allow_expired=False):
"""
Returns the decoded token (python dict) from an encoded JWT. This does all
the checks to insure that the decoded token is valid before returning it.
:param encoded_token: The encoded JWT to decode into a python dict.
:param csrf_value: Expected CSRF double submit value (optional)
:param allow_expired: Options to ignore exp claim validation in token
:return: Dictionary containing contents of the JWT
"""
jwt_manager = _get_jwt_manager()
unverified_claims = jwt.decode(
encoded_token, verify=False, algorithms=config.decode_algorithms
)
unverified_headers = jwt.get_unverified_header(encoded_token)
# Attempt to call callback with both claims and headers, but fallback to just claims
# for backwards compatibility
try:
secret = jwt_manager._decode_key_callback(unverified_claims, unverified_headers)
except TypeError:
msg = (
"The single-argument (unverified_claims) form of decode_key_callback ",
"is deprecated. Update your code to use the two-argument form ",
"(unverified_claims, unverified_headers)."
)
warn(msg, DeprecationWarning)
secret = jwt_manager._decode_key_callback(unverified_claims)
try:
return decode_jwt(
encoded_token=encoded_token,
raise HTTPUnauthorized()
log.debug(f"Bearer authorization, using provider: {provider}")
log.debug(f"Bearer authorization, using audience: {audience}")
log.debug(f"Bearer authorization, using claims: {claims}")
# Resolve provider into an OpenID configuration.
if provider.lower() == 'azure':
openid_url = MSONLINE_OPENID_URL
elif provider.lower() == 'google':
openid_url = GOOGLE_OPENID_URL
else:
log.warn(f"Unknown OpenID provider: {provider}")
raise HTTPInternalServerError()
token_header = jwt.get_unverified_header(token)
res = requests.get(openid_url)
if res.status_code != 200:
log.warn("Bad response from {openid_url}: {res.status_code}")
if res.status_code == 404:
raise HTTPNotFound()
elif res.status_code == 401:
raise HTTPUnauthorized()
elif res.status_code == 403:
raise HTTPForbidden()
elif res.status_code == 503:
raise HTTPServiceUnavailable()
else:
raise HTTPInternalServerError()
jwk_uri = res.json()['jwks_uri']
res = requests.get(jwk_uri)