Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def extract_token(self):
request = self.request
if "ws_token" in request.query:
jwt_token = request.query["ws_token"].encode("utf-8")
try:
jwetoken = jwe.JWE()
jwetoken.deserialize(jwt_token.decode("utf-8"))
jwetoken.decrypt(get_jwk_key())
payload = jwetoken.payload
except jwe.InvalidJWEOperation:
logger.warn(f"Invalid operation", exc_info=True)
return
except jwe.InvalidJWEData:
logger.warn(f"Error decrypting JWT token", exc_info=True)
return
json_payload = json.loads(payload)
if json_payload["exp"] <= int(time.time()):
logger.warning(f"Expired token {jwt_token}", exc_info=True)
return
data = {"type": "wstoken", "token": json_payload["token"]}
if "id" in json_payload:
data["id"] = json_payload["id"]
return data