Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _save_legacy_state_data(self, user_id, data):
protected_header = {
'alg': 'dir',
'enc': 'A256GCM',
'kid': '1,1',
}
jwe_token = jwe.JWE(
plaintext=base64url_encode(data),
protected=protected_header,
recipient=self.storage.encrypter.key
)
legacy_state_data = json.dumps({'data': jwe_token.serialize(compact=True)})
questionnaire_state = QuestionnaireState(
user_id,
legacy_state_data,
self.LEGACY_DATA_STORE_VERSION
)
data_access.put(questionnaire_state)
:param msg: a json-encoded value containing a JWS or JWE+JWS token
:raises InvalidMessage: if the message cannot be parsed or validated
:returns: A verified payload
"""
try:
jtok = JWT(jwt=msg)
except Exception as e:
raise InvalidMessage('Failed to parse message: %s' % str(e))
try:
token = jtok.token
if isinstance(token, JWE):
token.decrypt(self.kkstore.server_keys[KEY_USAGE_ENC])
# If an encrypted payload is received then there must be
# a nested signed payload to verify the provenance.
payload = token.payload.decode('utf-8')
token = JWS()
token.deserialize(payload)
elif isinstance(token, JWS):
pass
else:
raise TypeError("Invalid Token type: %s" % type(jtok))
# Retrieve client keys for later use
self.client_keys = [
JWK(**self._get_key(token.jose_header, KEY_USAGE_SIG)),
JWK(**self._get_key(token.jose_header, KEY_USAGE_ENC))]
def make_enc_kem(name, value, sig_key, alg, enc_key, enc):
plaintext = make_sig_kem(name, value, sig_key, alg)
eprot = {'kid': enc_key.key_id, 'alg': enc[0], 'enc': enc[1]}
jwe = JWE(plaintext, json_encode(eprot))
jwe.add_recipient(enc_key)
return jwe.serialize(compact=True)