Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
count += 1
ws = WSConnection(SERVER)
closed = False
while not closed:
try:
data: Optional[bytes] = sock.recv(65535)
except socket.error:
data = None
ws.receive_data(data or None)
outgoing_data = b""
for event in ws.events():
if isinstance(event, Request):
outgoing_data += ws.send(
AcceptConnection(extensions=[PerMessageDeflate()])
)
elif isinstance(event, Message):
outgoing_data += ws.send(
Message(data=event.data, message_finished=event.message_finished)
)
elif isinstance(event, Ping):
outgoing_data += ws.send(event.response())
elif isinstance(event, CloseConnection):
closed = True
if ws.state is not ConnectionState.CLOSED:
outgoing_data += ws.send(event.response())
if not data:
closed = True
try:
def test_inbound_bad_zlib_decoder_end_state(self, monkeypatch: MonkeyPatch) -> None:
compressed_payload = b"x" * 23
ext = wpext.PerMessageDeflate()
ext._enabled = True
proto = fp.FrameProtocol(client=True, extensions=[ext])
result = ext.frame_inbound_header(
proto,
fp.Opcode.BINARY,
fp.RsvBits(True, False, False),
len(compressed_payload),
)
assert isinstance(result, fp.RsvBits)
assert result.rsv1
class FailDecompressor:
def decompress(self, data: bytes) -> bytes:
return b""
def test_inbound_uncompressed_data_frame(self) -> None:
payload = b"x" * 23
ext = wpext.PerMessageDeflate()
ext._enabled = True
proto = fp.FrameProtocol(client=True, extensions=[ext])
result = ext.frame_inbound_header(
proto, fp.Opcode.BINARY, fp.RsvBits(False, False, False), len(payload)
)
assert isinstance(result, fp.RsvBits)
assert result.rsv1
data = ext.frame_inbound_payload_data(proto, payload)
assert data == payload
assert ext.frame_inbound_complete(proto, True) is None
def test_accept(self, params: Params) -> None:
ext = wpext.PerMessageDeflate()
assert not ext.enabled()
offer = self.make_offer_string(params)
response = ext.accept(offer)
response = cast(str, response)
if ext.client_no_context_takeover:
assert "client_no_context_takeover" in response
if ext.server_no_context_takeover:
assert "server_no_context_takeover" in response
if "client_max_window_bits" in params:
if params["client_max_window_bits"] is None:
bits = ext.client_max_window_bits
else:
bits = params["client_max_window_bits"]
assert ext.client_max_window_bits == bits
def test_outbound_uncompressible_opcode(self) -> None:
ext = wpext.PerMessageDeflate()
ext._enabled = True
proto = fp.FrameProtocol(client=True, extensions=[ext])
rsv = fp.RsvBits(False, False, False)
payload = b"x" * 23
rsv, data = ext.frame_outbound(proto, fp.Opcode.PING, rsv, payload, True)
assert rsv.rsv1 is False
assert data == payload
def test_client_decompress_after_uncompressible_frame(self, client: bool) -> None:
ext = wpext.PerMessageDeflate()
ext._enabled = True
proto = fp.FrameProtocol(client=client, extensions=[ext])
# A PING frame
result = ext.frame_inbound_header(
proto, fp.Opcode.PING, fp.RsvBits(False, False, False), 0
)
result2 = ext.frame_inbound_payload_data(proto, b"")
assert not isinstance(result2, fp.CloseReason)
assert ext.frame_inbound_complete(proto, True) is None
# A compressed TEXT frame
payload = b"x" * 23
compressed_payload = b"\xaa\xa8\xc0\n\x00\x00"
result3 = ext.frame_inbound_header(
def test_inbound_bad_zlib_payload(self) -> None:
compressed_payload = b"x" * 23
ext = wpext.PerMessageDeflate()
ext._enabled = True
proto = fp.FrameProtocol(client=True, extensions=[ext])
result = ext.frame_inbound_header(
proto,
fp.Opcode.BINARY,
fp.RsvBits(True, False, False),
len(compressed_payload),
)
assert isinstance(result, fp.RsvBits)
assert result.rsv1
result2 = ext.frame_inbound_payload_data(proto, compressed_payload)
assert result2 is fp.CloseReason.INVALID_FRAME_PAYLOAD_DATA
def test_client_inbound_compressed_multiple_data_frames(self, client: bool) -> None:
payload = b"x" * 23
compressed_payload = b"\xaa\xa8\xc0\n\x00\x00"
split = 3
data = b""
ext = wpext.PerMessageDeflate()
ext._enabled = True
proto = fp.FrameProtocol(client=client, extensions=[ext])
result = ext.frame_inbound_header(
proto, fp.Opcode.BINARY, fp.RsvBits(True, False, False), split
)
assert isinstance(result, fp.RsvBits)
assert result.rsv1
result2 = ext.frame_inbound_payload_data(proto, compressed_payload[:split])
assert not isinstance(result2, fp.CloseReason)
data += result2
assert ext.frame_inbound_complete(proto, False) is None
result3 = ext.frame_inbound_header(
proto,
fp.Opcode.CONTINUATION,
def start(self, _) -> commands.TCommandGenerator:
extensions = []
if 'Sec-WebSocket-Extensions' in self.handshake_flow.response.headers:
if PerMessageDeflate.name in self.handshake_flow.response.headers['Sec-WebSocket-Extensions']:
extensions = [PerMessageDeflate()]
self.client_conn = WSConnection(ConnectionType.SERVER,
extensions=extensions)
self.server_conn = WSConnection(ConnectionType.CLIENT,
host=self.handshake_flow.request.host,
resource=self.handshake_flow.request.path,
extensions=extensions)
if extensions:
self.client_conn.extensions[0].finalize(self.client_conn, self.handshake_flow.response.headers['Sec-WebSocket-Extensions'])
self.server_conn.extensions[0].finalize(self.server_conn, self.handshake_flow.response.headers['Sec-WebSocket-Extensions'])
data = self.server_conn.bytes_to_send()
self.client_conn.receive_bytes(data)
event = next(self.client_conn.events())
assert isinstance(event, wsevents.ConnectionRequested)
def start(self, _) -> commands.TCommandGenerator:
extensions = []
if 'Sec-WebSocket-Extensions' in self.handshake_flow.response.headers:
if PerMessageDeflate.name in self.handshake_flow.response.headers['Sec-WebSocket-Extensions']:
extensions = [PerMessageDeflate()]
self.client_conn = WSConnection(ConnectionType.SERVER,
extensions=extensions)
self.server_conn = WSConnection(ConnectionType.CLIENT,
host=self.handshake_flow.request.host,
resource=self.handshake_flow.request.path,
extensions=extensions)
if extensions:
self.client_conn.extensions[0].finalize(self.client_conn, self.handshake_flow.response.headers['Sec-WebSocket-Extensions'])
self.server_conn.extensions[0].finalize(self.server_conn, self.handshake_flow.response.headers['Sec-WebSocket-Extensions'])
data = self.server_conn.bytes_to_send()
self.client_conn.receive_bytes(data)
event = next(self.client_conn.events())
assert isinstance(event, wsevents.ConnectionRequested)