Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def extract_token(self):
request = self.request
if "ws_token" in request.query:
jwt_token = request.query["ws_token"].encode("utf-8")
try:
jwetoken = jwe.JWE()
jwetoken.deserialize(jwt_token.decode("utf-8"))
jwetoken.decrypt(get_jwk_key())
payload = jwetoken.payload
except jwe.InvalidJWEOperation:
logger.warn(f"Invalid operation", exc_info=True)
return
except jwe.InvalidJWEData:
logger.warn(f"Error decrypting JWT token", exc_info=True)
return
json_payload = json.loads(payload)
if json_payload["exp"] <= int(time.time()):
logger.warning(f"Expired token {jwt_token}", exc_info=True)
return
data = {"type": "wstoken", "token": json_payload["token"]}
if "id" in json_payload:
data["id"] = json_payload["id"]
return data
def decrypt_jwe(encrypted_token, secret_store, purpose):
try:
jwe_token = jwe.JWE(algs=['RSA-OAEP', 'A256GCM'])
jwe_token.deserialize(encrypted_token)
jwe_kid = extract_kid_from_header(encrypted_token)
logger.info("Decrypting JWE", kid=jwe_kid)
private_jwk = secret_store.get_private_key_by_kid(purpose, jwe_kid).as_jwk()
jwe_token.decrypt(private_jwk)
return jwe_token.payload.decode()
except InvalidJWEData as e:
raise InvalidTokenException(repr(e))