Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_decode__wrong_secret(self):
# check that we can't decode with the wrong secret
payload = {k: "foo" for k in MANDATORY_CLAIMS}
encoded = jwt_encode(payload, "QWERTYUIO")
self.assertRaises(DecodeError, decode, encoded)
try:
signing_input, crypto_segment = jwt.rsplit(b".", 1)
header_segment, payload_segment = signing_input.split(b".", 1)
except ValueError:
raise DecodeError("Not enough segments")
try:
header_data = base64url_decode(header_segment)
except (TypeError, binascii.Error):
raise DecodeError("Invalid header padding")
try:
header = json.loads(header_data.decode("utf-8"))
except ValueError as e:
raise DecodeError("Invalid header string: %s" % e)
if not isinstance(header, Mapping):
raise DecodeError("Invalid header string: must be a json object")
try:
payload = base64url_decode(payload_segment)
except (TypeError, binascii.Error):
raise DecodeError("Invalid payload padding")
try:
signature = base64url_decode(crypto_segment)
except (TypeError, binascii.Error):
raise DecodeError("Invalid crypto padding")
return (payload, signing_input, header, signature)
if not issubclass(type(jwt), binary_type):
raise DecodeError(
"Invalid token type. Token must be a {0}".format(binary_type)
)
try:
signing_input, crypto_segment = jwt.rsplit(b".", 1)
header_segment, payload_segment = signing_input.split(b".", 1)
except ValueError:
raise DecodeError("Not enough segments")
try:
header_data = base64url_decode(header_segment)
except (TypeError, binascii.Error):
raise DecodeError("Invalid header padding")
try:
header = json.loads(header_data.decode("utf-8"))
except ValueError as e:
raise DecodeError("Invalid header string: %s" % e)
if not isinstance(header, Mapping):
raise DecodeError("Invalid header string: must be a json object")
try:
payload = base64url_decode(payload_segment)
except (TypeError, binascii.Error):
raise DecodeError("Invalid payload padding")
try:
signature = base64url_decode(crypto_segment)
def wrapper(request, *args, **kwargs):
"""Validate the JWT signature."""
try:
jwt_token = None
if "Authorization" in request.headers:
jwt_token = request.headers["Authorization"].split(" ")[1]
else:
jwt_token = request.params["jwt_token"]
decoded_jwt = jwt.decode(
jwt_token, request.registry.settings["jwt_secret"], algorithms=["HS256"]
)
lms_guid = decoded_jwt["user_id"]
user = find_by_lms_guid(request.db, lms_guid)
except (jwt.exceptions.DecodeError, KeyError):
return Response("<p>Error: Unauthenticated Request</p>", status=401)
return view_function(request, decoded_jwt, *args, user=user, **kwargs)
def run(token, candidate):
"""This function checks if a candidate can decrypt a JWT token.
Arguments:
token {string} -- An encrypted JWT token to test
candidate {string} -- A candidate word for decoding
Returns:
[boolean] -- Result of the decoding attempt
"""
try:
payload = jwt.decode(token, candidate, algorithm='HS256')
return True
except jwt.exceptions.DecodeError:
logger.debug(f"DecodingError: {candidate}")
return False
except jwt.exceptions.InvalidTokenError:
logger.debug(f"InvalidTokenError: {candidate}")
return False
except Exception as ex:
logger.exception(f"Exception: {ex}")
sys.exit(1)
def _decode(self, authdata):
# First, decode the authdata without checking the signature.
decoded = jwt.decode(
authdata, algorithm=self.ALGORITHM,
options=dict(verify_signature=False)
)
# This lets us get the library URI, which lets us get the secret.
library_uri = decoded.get('iss')
if not library_uri in self.secrets_by_library_uri:
# The request came in without a library specified
# or with an unknown library specified.
raise jwt.exceptions.DecodeError(
"Unknown library: %s" % library_uri
)
# We know the secret for this library, so we can re-decode the
# secret and require signature valudation this time.
secret = self.secrets_by_library_uri[library_uri]
decoded = jwt.decode(authdata, secret, algorithm=self.ALGORITHM)
if not 'sub' in decoded:
raise jwt.exceptions.DecodeError("No subject specified.")
return library_uri, decoded['sub']
header_data = base64url_decode(header_segment)
except (TypeError, binascii.Error):
raise DecodeError("Invalid header padding")
try:
header = json.loads(header_data.decode("utf-8"))
except ValueError as e:
raise DecodeError("Invalid header string: %s" % e)
if not isinstance(header, Mapping):
raise DecodeError("Invalid header string: must be a json object")
try:
payload = base64url_decode(payload_segment)
except (TypeError, binascii.Error):
raise DecodeError("Invalid payload padding")
try:
signature = base64url_decode(crypto_segment)
except (TypeError, binascii.Error):
raise DecodeError("Invalid crypto padding")
return (payload, signing_input, header, signature)
header = json.loads(header_data.decode("utf-8"))
except ValueError as e:
raise DecodeError("Invalid header string: %s" % e)
if not isinstance(header, Mapping):
raise DecodeError("Invalid header string: must be a json object")
try:
payload = base64url_decode(payload_segment)
except (TypeError, binascii.Error):
raise DecodeError("Invalid payload padding")
try:
signature = base64url_decode(crypto_segment)
except (TypeError, binascii.Error):
raise DecodeError("Invalid crypto padding")
return (payload, signing_input, header, signature)