Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_asymmetric_jws_token_plugin(self):
key = jwt.JWK()
private_key = open("./tests/fixtures/private.pem", "rb").read()
key.import_from_pem(private_key)
jwt_token = jwt.JWT({"alg": "RS256"}, {'host': "remote_host", 'port': "remote_port"})
jwt_token.make_signed_token(key)
self.handler.path = "https://localhost:6080/websockify?token={jwt_token}".format(jwt_token=jwt_token.serialize())
self.stubs.Set(websocketproxy.ProxyRequestHandler, 'send_auth_error',
staticmethod(lambda *args, **kwargs: None))
self.handler.server.token_plugin = token_plugins.JWTTokenApi("./tests/fixtures/public.pem")
self.handler.validate_connection()
self.assertEqual(self.handler.server.target_host, "remote_host")
self.assertEqual(self.handler.server.target_port, "remote_port")
def topic(self, topic):
""" Verify the token with some public key """
return jwt.verify_jwt(topic, JWK(kty='oct', k=base64url_encode('anysecrethere')))
def test_symmetric_jws_token_plugin(self):
secret = open("./tests/fixtures/symmetric.key").read()
key = jwt.JWK()
key.import_key(kty="oct",k=secret)
jwt_token = jwt.JWT({"alg": "HS256"}, {'host': "remote_host", 'port': "remote_port"})
jwt_token.make_signed_token(key)
self.handler.path = "https://localhost:6080/websockify?token={jwt_token}".format(jwt_token=jwt_token.serialize())
self.stubs.Set(websocketproxy.ProxyRequestHandler, 'send_auth_error',
staticmethod(lambda *args, **kwargs: None))
self.handler.server.token_plugin = token_plugins.JWTTokenApi("./tests/fixtures/symmetric.key")
self.handler.validate_connection()
self.assertEqual(self.handler.server.target_host, "remote_host")
self.assertEqual(self.handler.server.target_port, "remote_port")
def test_asymmetric_jwe_token_plugin(self):
private_key = jwt.JWK()
public_key = jwt.JWK()
private_key_data = open("./tests/fixtures/private.pem", "rb").read()
public_key_data = open("./tests/fixtures/public.pem", "rb").read()
private_key.import_from_pem(private_key_data)
public_key.import_from_pem(public_key_data)
jwt_token = jwt.JWT({"alg": "RS256"}, {'host': "remote_host", 'port': "remote_port"})
jwt_token.make_signed_token(private_key)
jwe_token = jwt.JWT(header={"alg": "RSA1_5", "enc": "A256CBC-HS512"},
claims=jwt_token.serialize())
jwe_token.make_encrypted_token(public_key)
self.handler.path = "https://localhost:6080/websockify?token={jwt_token}".format(jwt_token=jwe_token.serialize())
self.stubs.Set(websocketproxy.ProxyRequestHandler, 'send_auth_error',
staticmethod(lambda *args, **kwargs: None))
self.handler.server.token_plugin = token_plugins.JWTTokenApi("./tests/fixtures/private.pem")
def topic(self, topic):
""" Verify the token """
clock_load(orig_datetime.utcfromtimestamp(0))
r = jwt.verify_jwt(topic,
JWK(kty='oct', k=base64url_encode('secret')),
['HS256'])
clock_reset()
return r
def test_asymmetric_jws_token_plugin_with_illigal_key_exception(self):
key = jwt.JWK()
private_key = open("./tests/fixtures/private.pem", "rb").read()
key.import_from_pem(private_key)
jwt_token = jwt.JWT({"alg": "RS256"}, {'host': "remote_host", 'port': "remote_port"})
jwt_token.make_signed_token(key)
self.handler.path = "https://localhost:6080/websockify?token={jwt_token}".format(jwt_token=jwt_token.serialize())
self.stubs.Set(websocketproxy.ProxyRequestHandler, 'send_auth_error',
staticmethod(lambda *args, **kwargs: None))
self.handler.server.token_plugin = token_plugins.JWTTokenApi("wrong.pub")
self.assertRaises(self.handler.server.EClose,
self.handler.validate_connection)
def test_asymmetric_jwe_token_plugin(self):
private_key = jwt.JWK()
public_key = jwt.JWK()
private_key_data = open("./tests/fixtures/private.pem", "rb").read()
public_key_data = open("./tests/fixtures/public.pem", "rb").read()
private_key.import_from_pem(private_key_data)
public_key.import_from_pem(public_key_data)
jwt_token = jwt.JWT({"alg": "RS256"}, {'host': "remote_host", 'port': "remote_port"})
jwt_token.make_signed_token(private_key)
jwe_token = jwt.JWT(header={"alg": "RSA1_5", "enc": "A256CBC-HS512"},
claims=jwt_token.serialize())
jwe_token.make_encrypted_token(public_key)
self.handler.path = "https://localhost:6080/websockify?token={jwt_token}".format(jwt_token=jwe_token.serialize())
self.stubs.Set(websocketproxy.ProxyRequestHandler, 'send_auth_error',
staticmethod(lambda *args, **kwargs: None))
def test_symmetric_jws_token_plugin_with_illigal_key_exception(self):
secret = open("./tests/fixtures/symmetric.key").read()
key = jwt.JWK()
key.import_key(kty="oct",k=secret)
jwt_token = jwt.JWT({"alg": "HS256"}, {'host': "remote_host", 'port': "remote_port"})
jwt_token.make_signed_token(key)
self.handler.path = "https://localhost:6080/websockify?token={jwt_token}".format(jwt_token=jwt_token.serialize())
self.stubs.Set(websocketproxy.ProxyRequestHandler, 'send_auth_error',
staticmethod(lambda *args, **kwargs: None))
self.handler.server.token_plugin = token_plugins.JWTTokenApi("wrong_sauce")
self.assertRaises(self.handler.server.EClose,
self.handler.validate_connection)
def lookup(self, token):
try:
from jwcrypto import jwt
import json
key = jwt.JWK()
try:
with open(self.source, 'rb') as key_file:
key_data = key_file.read()
except Exception as e:
print("Error loading key file: %s" % str(e), file=sys.stderr)
return None
try:
key.import_from_pem(key_data)
except:
try:
key.import_key(k=key_data.decode('utf-8'),kty='oct')
except:
print('Failed to correctly parse key data!', file=sys.stderr)
return None