Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _store_keys(keystore, usage, keys):
name = os.path.join('kemkeys',
kem.KEY_USAGE_MAP[usage],
keys[usage]['kid'])
keystore.set(name, json_encode(keys[usage]), True)
def f(claims, alg, lifetime=None, expires=None, not_before=None):
""" generate token using node-jsjws """
now = datetime.utcnow()
return spawn(
"fixtures.generate({now}, {header}, {claims}, {expires}, {not_before}, {key})".format(
now=timegm(now.utctimetuple()),
header=json_encode({'alg': alg}),
claims=json_encode(claims),
expires=timegm(((now + lifetime) if lifetime else expires).utctimetuple()),
not_before=timegm((not_before or now).utctimetuple()),
key=json_encode(base64url_decode(json_decode(key.export())['k']) if key.is_symmetric else key.export_to_pem(True, None))),
False)
return f
def f(sjwt, iat_skew=timedelta()):
""" verify token using node-jsjws """
r = spawn(
"fixtures.verify({now}, {sjwt}, {iat_skew}, {key}, {alg})".format(
now=timegm(datetime.utcnow().utctimetuple()),
sjwt=json_encode(sjwt),
iat_skew=iat_skew.total_seconds(),
key=json_encode(base64url_decode(json_decode(key.export())['k']) if key.is_symmetric else key.export_to_pem()),
alg=json_encode(alg)),
True)
return tuple(r)
return f
def f(claims, alg, lifetime=None, expires=None, not_before=None):
""" generate token using node-jsjws """
now = datetime.utcnow()
return spawn(
"fixtures.generate({now}, {header}, {claims}, {expires}, {not_before}, {key})".format(
now=timegm(now.utctimetuple()),
header=json_encode({'alg': alg}),
claims=json_encode(claims),
expires=timegm(((now + lifetime) if lifetime else expires).utctimetuple()),
not_before=timegm((not_before or now).utctimetuple()),
key=json_encode(base64url_decode(json_decode(key.export())['k']) if key.is_symmetric else key.export_to_pem(True, None))),
False)
return f
def set(self, key, value, replace=False):
self.protected_header = {'alg': 'dir', 'enc': self.master_enctype}
if self.secret_protection != 'encrypt':
self.protected_header['custodia.key'] = key
protected = json_encode(self.protected_header)
jwe = JWE(value, protected)
jwe.add_recipient(self.mkey)
cvalue = jwe.serialize(compact=True)
return self.store.set(key, cvalue, replace)
def set(self, key, value, replace=False):
protected = json_encode({'alg': 'dir', 'enc': self.master_enctype})
jwe = JWE(value, protected)
jwe.add_recipient(self.mkey)
cvalue = jwe.serialize(compact=True)
return super(EncryptedStore, self).set(key, cvalue, replace)
if lifetime:
claims['exp'] = timegm((now + lifetime).utctimetuple())
elif expires:
claims['exp'] = timegm(expires.utctimetuple())
if header['alg'] == 'none':
signature = ''
else:
token = JWS(json_encode(claims))
token.add_signature(priv_key, protected=header)
signature = json_decode(token.serialize())['signature']
return u'%s.%s.%s' % (
base64url_encode(json_encode(header)),
base64url_encode(json_encode(claims)),
signature
)