Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _save_session(self, session_id, user_id, data, legacy=False):
raw_data = json.dumps(vars(data))
protected_header = {
'alg': 'dir',
'enc': 'A256GCM',
'kid': '1,1',
}
if legacy:
plaintext = base64url_encode(raw_data)
else:
plaintext = raw_data
jwe_token = jwe.JWE(
plaintext=plaintext,
protected=protected_header,
recipient=self.key
)
session_model = EQSession(
session_id,
user_id,
jwe_token.serialize(compact=True)
)
data_access.put(session_model)
def encrypt_data(self, data):
if isinstance(data, dict):
data = json.dumps(data)
protected_header = {
'alg': 'dir',
'enc': 'A256GCM',
'kid': '1,1',
}
data = snappy.compress(data)
jwe_token = jwe.JWE(
plaintext=data,
protected=protected_header,
recipient=self.key,
)
return jwe_token.serialize(compact=True)
def _save_compressed_state_data(self, user_id, data):
protected_header = {
'alg': 'dir',
'enc': 'A256GCM',
'kid': '1,1',
}
jwe_token = jwe.JWE(
plaintext=snappy.compress(data),
protected=protected_header,
recipient=self.storage.encrypter.key
)
state_data = jwe_token.serialize(compact=True)
questionnaire_state = QuestionnaireState(
user_id,
state_data,
QuestionnaireStore.LATEST_VERSION + 1
)
data_access.put(questionnaire_state)
def decrypt_data(self, encrypted_token):
jwe_token = jwe.JWE(algs=['dir', 'A256GCM'])
jwe_token.deserialize(encrypted_token, self.key)
return jwe_token.payload
def _encrypt_options(configuration):
key = jwk.JWK(k=configuration.private_api_key, kty="oct")
protected = {"alg": "A128GCMKW", "enc": "A128GCM"}
payload = json.dumps({
"conversation_id": configuration.conversation_id,
"current_user_id": configuration.current_user_id,
"current_user_name": configuration.current_user_name
})
encryption = jwe.JWE(payload, json.dumps(protected))
encryption.add_recipient(key)
return encryption.serialize(compact=True)
def generate_websocket_token(self, real_token, data=None):
data = data or {}
claims = {
"iat": int(time.time()),
"exp": int(time.time() + self._websockets_ttl),
"token": real_token,
}
claims.update(data)
payload = ujson.dumps(claims)
jwetoken = jwe.JWE(payload.encode("utf-8"), json_encode({"alg": "A256KW", "enc": "A256CBC-HS512"}))
jwetoken.add_recipient(get_jwk_key())
token = jwetoken.serialize(compact=True)
return token
async def extract_token(self):
request = self.request
if "ws_token" in request.query:
jwt_token = request.query["ws_token"].encode("utf-8")
try:
jwetoken = jwe.JWE()
jwetoken.deserialize(jwt_token.decode("utf-8"))
jwetoken.decrypt(get_jwk_key())
payload = jwetoken.payload
except jwe.InvalidJWEOperation:
logger.warn(f"Invalid operation", exc_info=True)
return
except jwe.InvalidJWEData:
logger.warn(f"Error decrypting JWT token", exc_info=True)
return
json_payload = json.loads(payload)
if json_payload["exp"] <= int(time.time()):
logger.warning(f"Expired token {jwt_token}", exc_info=True)
return
data = {"type": "wstoken", "token": json_payload["token"]}
if "id" in json_payload:
data["id"] = json_payload["id"]
def decrypt_jwe(encrypted_token, secret_store, purpose):
try:
jwe_token = jwe.JWE(algs=['RSA-OAEP', 'A256GCM'])
jwe_token.deserialize(encrypted_token)
jwe_kid = extract_kid_from_header(encrypted_token)
logger.info("Decrypting JWE", kid=jwe_kid)
private_jwk = secret_store.get_private_key_by_kid(purpose, jwe_kid).as_jwk()
jwe_token.decrypt(private_jwk)
return jwe_token.payload.decode()
except InvalidJWEData as e:
raise InvalidTokenException(repr(e))
def encrypt_data(self, data):
if isinstance(data, dict):
data = json.dumps(data)
protected_header = {
'alg': 'dir',
'enc': 'A256GCM',
'kid': '1,1',
}
jwe_token = jwe.JWE(plaintext=data, protected=protected_header)
jwe_token.add_recipient(self.key)
return jwe_token.serialize(compact=True)