Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
filename = parser.get('store:encgen', 'master_key')
key = jwk.JWK(generate='oct', size=256)
with open(filename, 'w+') as keyfile:
keyfile.write(key.export())
store = SqliteStore(parser, 'store:simple')
srv_kid = "srvkid"
cli_kid = "clikid"
ss_key = jwk.JWK(generate='RSA', kid=srv_kid, use="sig")
se_key = jwk.JWK(generate='RSA', kid=srv_kid, use="enc")
store.set('kemkeys/sig/%s' % srv_kid, ss_key.export())
store.set('kemkeys/enc/%s' % srv_kid, se_key.export())
cs_key = jwk.JWK(generate='RSA', kid=cli_kid, use="sig")
ce_key = jwk.JWK(generate='RSA', kid=cli_kid, use="enc")
store.set('kemkeys/sig/%s' % cli_kid, cs_key.export_public())
store.set('kemkeys/enc/%s' % cli_kid, ce_key.export_public())
return ([ss_key.export_public(), se_key.export_public()],
[cs_key.export(), ce_key.export()])
def main(args):
"""Generates a signed JSON Web Token from local private key."""
with open(args.key) as f:
pem_data = f.read()
f.closed
pem_data_encode = pem_data.encode("utf-8")
key = jwk.JWK.from_pem(pem_data_encode)
if args.jwks:
with open(args.jwks, "w+") as fout:
fout.write("{ \"keys\":[ ")
fout.write(key.export(private_key=False))
fout.write("]}")
fout.close
now = int(time.time())
payload = {
# expire in one hour.
"exp": now + args.expire,
"iat": now,
}
if args.iss:
payload["iss"] = args.iss
def main(args):
if os.path.isfile(args.key):
with open(args.key) as f:
pem_data = f.read()
f.closed
key = jwk.JWK.from_pem(pem_data)
else:
raise Exception('Unhandled key type: %s' % args.key)
with open(args.jwt) as f:
raw_jwt = f.read()
token = jwt.JWT()
token.deserialize(raw_jwt, key)
def main(args):
"""Generates a signed JSON Web Token from local private key."""
# Begin modification
if os.path.isfile(args.key):
with open(args.key) as f:
pem_data = f.read()
f.closed
key = jwk.JWK.from_pem(pem_data)
else:
if str.startswith(args.key, 'ibmtss2:'):
key = EngineJWK('tpm2', args.key[8:])
else:
raise Exception('Unhandled key type: %s' % args.key)
# End modification
if args.jwks:
with open(args.jwks, "w+") as fout:
# this is the old JWKS format
# fout.write("{ \"keys\":[ ")
# fout.write(key.export(private_key=False))
# fout.write("]}")
# this is the new PEM format
fout.write('{ "jwt_validation_pubkeys": "')
claims = dict(
iss=iss,
iat=jwt_iat,
exp=jwt_exp,
aud=aud,
sub=sub,
scope=scope,
token_endpoint_auth_method='private_key_jwt',
grant_types=['authorization_code', 'refresh_token', 'client_credentials'],
response_types=['code', 'id_token'],
client_id=client_id,
software_statement=ssa
)
token = jwt.JWT(header=header, claims=claims)
key_obj = jwk.JWK.from_pem(cache.get('private_key_pem').encode('latin-1'))
token.make_signed_token(key_obj)
signed_token = token.serialize()
return signed_token
def main(args):
"""Generates a signed JSON Web Token from local private key."""
if os.path.isfile(args.key):
with open(args.key) as f:
pem_data = f.read()
key = jwk.JWK.from_pem(pem_data)
else:
raise Exception('Unhandled key type: %s' % args.key)
now = int(time.time())
payload = {
# expire in one hour.
"exp": now + expire,
"iat": now,
}
payload["iss"] = iss
if args.sub:
payload["sub"] = args.sub
else:
payload["sub"] = iss
def main(args):
"""Generates a signed JSON Web Token from local private key."""
# Begin modification
if os.path.isfile(args.key):
with open(args.key) as f:
pem_data = f.read()
f.closed
key = jwk.JWK.from_pem(pem_data)
else:
key = jwk.JWK.from_uri(args.key)
# End modification
if args.jwks:
with open(args.jwks, "w+") as fout:
# this is the old JWKS format
# fout.write("{ \"keys\":[ ")
# fout.write(key.export(private_key=False))
# fout.write("]}")
# this is the new PEM format
fout.write("{ \"jwt_validation_pubkeys\": \"")
fout.write(key.public().export_to_pem())
fout.write("\" }")
fout.close
def _encrypt_options(configuration):
key = jwk.JWK(k=configuration.private_api_key, kty="oct")
protected = {"alg": "A128GCMKW", "enc": "A128GCM"}
payload = json.dumps({
"conversation_id": configuration.conversation_id,
"current_user_id": configuration.current_user_id,
"current_user_name": configuration.current_user_name
})
encryption = jwe.JWE(payload, json.dumps(protected))
encryption.add_recipient(key)
return encryption.serialize(compact=True)
def configure_jwt_authorization_flow(self, private_key_filename, oauth_base_path, client_id, user_id, expires_in,
key_bytes=None):
now = math.floor(time())
later = now + (expires_in * 1000)
token = jwt.JWT(header={"alg": "RS256"},
claims={"iss": client_id, "sub": user_id, "aud": oauth_base_path, "iat": now, "exp": later, "scope": "signature"})
if not key_bytes:
with open(private_key_filename, 'rb') as f:
priv_key = jwk.JWK.from_pem(f.read())
token.make_signed_token(priv_key)
else:
# If key_bytes supplied right from database, if any, after decrypting (if encrypted-at-rest),
# base64 decoded, etc.
token.make_signed_token(key_bytes)
assertion = token.serialize()
# perform request and return response
response = self.request("POST", "https://" + oauth_base_path + "/oauth/token",
headers=self.sanitize_for_serialization({"Content-Type": "application/x-www-form-urlencoded"}),
post_params=self.sanitize_for_serialization({"assertion": assertion, "grant_type": "urn:ietf:params:oauth:grant-type:jwt-bearer"}))
response_data = json.loads(response.data.decode('utf-8'))
if 'token_type' in response_data and 'access_token' in response_data:
self.set_default_header("Authorization", response_data['token_type'] + " " + response_data['access_token'])