Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def initiate(self, host, path, **kwargs):
ws = WSConnection(CLIENT)
ws.send(Request(host=host, target=path, **kwargs))
data = ws.bytes_to_send()
request, headers = data.split(b"\r\n", 1)
method, path, version = request.strip().split()
headers = parse_headers(headers)
return ws, method, path, version, headers
def test_missing_version(self):
test_host = "frob.nitz"
test_path = "/fnord"
ws = WSConnection(SERVER)
nonce = bytes(random.getrandbits(8) for x in range(0, 16))
nonce = base64.b64encode(nonce)
request = b"GET " + test_path.encode("ascii") + b" HTTP/1.1\r\n"
request += b"Host: " + test_host.encode("ascii") + b"\r\n"
request += b"Connection: Upgrade\r\n"
request += b"Upgrade: WebSocket\r\n"
request += b"Sec-WebSocket-Key: " + nonce + b"\r\n"
request += b"\r\n"
ws.receive_bytes(request)
event = next(ws.events())
assert isinstance(event, Fail)
def test_simple_extension_negotiation(self):
test_host = "frob.nitz"
test_path = "/fnord"
ext = FakeExtension(accept_response=True)
ws = WSConnection(SERVER)
nonce = bytes(random.getrandbits(8) for x in range(0, 16))
nonce = base64.b64encode(nonce)
request = b"GET " + test_path.encode("ascii") + b" HTTP/1.1\r\n"
request += b"Host: " + test_host.encode("ascii") + b"\r\n"
request += b"Connection: Upgrade\r\n"
request += b"Upgrade: WebSocket\r\n"
request += b"Sec-WebSocket-Version: 13\r\n"
request += b"Sec-WebSocket-Key: " + nonce + b"\r\n"
request += b"Sec-WebSocket-Extensions: " + ext.name.encode("ascii") + b"\r\n"
request += b"\r\n"
ws.receive_bytes(request)
event = next(ws.events())
assert isinstance(event, Request)
def test_accept_subprotocol(self):
test_host = "frob.nitz"
test_path = "/fnord"
ws = WSConnection(SERVER)
nonce = bytes(random.getrandbits(8) for x in range(0, 16))
nonce = base64.b64encode(nonce)
request = b"GET " + test_path.encode("ascii") + b" HTTP/1.1\r\n"
request += b"Host: " + test_host.encode("ascii") + b"\r\n"
request += b"Connection: Upgrade\r\n"
request += b"Upgrade: WebSocket\r\n"
request += b"Sec-WebSocket-Version: 13\r\n"
request += b"Sec-WebSocket-Key: " + nonce + b"\r\n"
request += b"Sec-WebSocket-Protocol: one, two\r\n"
request += b"\r\n"
ws.receive_bytes(request)
event = next(ws.events())
assert isinstance(event, Request)
def test_bad_connection(self):
test_host = "frob.nitz"
test_path = "/fnord"
ws = WSConnection(SERVER)
nonce = bytes(random.getrandbits(8) for x in range(0, 16))
nonce = base64.b64encode(nonce)
request = b"GET " + test_path.encode("ascii") + b" HTTP/1.1\r\n"
request += b"Host: " + test_host.encode("ascii") + b"\r\n"
request += b"Connection: Zoinks\r\n"
request += b"Upgrade: WebSocket\r\n"
request += b"Sec-WebSocket-Version: 13\r\n"
request += b"Sec-WebSocket-Key: " + nonce + b"\r\n"
request += b"\r\n"
ws.receive_bytes(request)
event = next(ws.events())
assert isinstance(event, Fail)
def test_disinterested_extension_negotiation(self, accept_response):
test_host = "frob.nitz"
test_path = "/fnord"
ext = FakeExtension(accept_response=accept_response)
ws = WSConnection(SERVER)
nonce = bytes(random.getrandbits(8) for x in range(0, 16))
nonce = base64.b64encode(nonce)
request = b"GET " + test_path.encode("ascii") + b" HTTP/1.1\r\n"
request += b"Host: " + test_host.encode("ascii") + b"\r\n"
request += b"Connection: Upgrade\r\n"
request += b"Upgrade: WebSocket\r\n"
request += b"Sec-WebSocket-Version: 13\r\n"
request += b"Sec-WebSocket-Key: " + nonce + b"\r\n"
request += b"Sec-WebSocket-Extensions: " + ext.name.encode("ascii") + b"\r\n"
request += b"\r\n"
ws.receive_bytes(request)
event = next(ws.events())
assert isinstance(event, Request)
def __init__(self, ctx, handshake_flow):
super().__init__(ctx)
self.handshake_flow = handshake_flow
self.flow: WebSocketFlow = None
self.client_frame_buffer = []
self.server_frame_buffer = []
self.connections: dict[object, WSConnection] = {}
extensions = []
if 'Sec-WebSocket-Extensions' in handshake_flow.response.headers:
if PerMessageDeflate.name in handshake_flow.response.headers['Sec-WebSocket-Extensions']:
extensions = [PerMessageDeflate()]
self.connections[self.client_conn] = WSConnection(ConnectionType.SERVER,
extensions=extensions)
self.connections[self.server_conn] = WSConnection(ConnectionType.CLIENT,
host=handshake_flow.request.host,
resource=handshake_flow.request.path,
extensions=extensions)
if extensions:
for conn in self.connections.values():
conn.extensions[0].finalize(conn, handshake_flow.response.headers['Sec-WebSocket-Extensions'])
data = self.connections[self.server_conn].bytes_to_send()
self.connections[self.client_conn].receive_bytes(data)
event = next(self.connections[self.client_conn].events())
assert isinstance(event, events.ConnectionRequested)
self.connections[self.client_conn].accept(event)
super().__init__(ctx)
self.handshake_flow = handshake_flow
self.flow: WebSocketFlow = None
self.client_frame_buffer = []
self.server_frame_buffer = []
self.connections: dict[object, WSConnection] = {}
extensions = []
if 'Sec-WebSocket-Extensions' in handshake_flow.response.headers:
if PerMessageDeflate.name in handshake_flow.response.headers['Sec-WebSocket-Extensions']:
extensions = [PerMessageDeflate()]
self.connections[self.client_conn] = WSConnection(ConnectionType.SERVER,
extensions=extensions)
self.connections[self.server_conn] = WSConnection(ConnectionType.CLIENT,
host=handshake_flow.request.host,
resource=handshake_flow.request.path,
extensions=extensions)
if extensions:
for conn in self.connections.values():
conn.extensions[0].finalize(conn, handshake_flow.response.headers['Sec-WebSocket-Extensions'])
data = self.connections[self.server_conn].bytes_to_send()
self.connections[self.client_conn].receive_bytes(data)
event = next(self.connections[self.client_conn].events())
assert isinstance(event, events.ConnectionRequested)
self.connections[self.client_conn].accept(event)
self.connections[self.server_conn].receive_bytes(self.connections[self.client_conn].bytes_to_send())
assert isinstance(next(self.connections[self.server_conn].events()), events.ConnectionEstablished)