Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_split_unicode_message(self) -> None:
text_payload = "∂" * 64
payload = text_payload.encode("utf-8")
split = 64
decoder = fp.MessageDecoder()
frame = fp.Frame(
opcode=fp.Opcode.TEXT,
payload=payload[:split],
frame_finished=False,
message_finished=True,
)
frame = decoder.process_frame(frame)
assert frame.opcode is fp.Opcode.TEXT
assert frame.message_finished is False
assert frame.payload == text_payload[: (split // 3)]
frame = fp.Frame(
opcode=fp.Opcode.CONTINUATION,
payload=payload[split:],
frame_finished=True,
message_finished=True,
)
frame = decoder.process_frame(frame)
def test_frame_outbound(self) -> None:
ext = wpext.Extension()
rsv = fp.RsvBits(True, True, True)
data = b""
assert ext.frame_outbound(None, None, rsv, data, None) == ( # type: ignore
rsv,
data,
)
def frame_inbound_header(
self,
proto: Union[fp.FrameDecoder, fp.FrameProtocol],
opcode: fp.Opcode,
rsv: fp.RsvBits,
payload_length: int,
) -> Union[fp.CloseReason, fp.RsvBits]:
self._inbound_header_called = True
if opcode is fp.Opcode.PONG:
return fp.CloseReason.MANDATORY_EXT
self._inbound_rsv_bit_set = rsv.rsv3
return fp.RsvBits(False, False, True)
def test_inbound_uncompressed_control_frame(self) -> None:
payload = b"x" * 23
ext = wpext.PerMessageDeflate()
ext._enabled = True
proto = fp.FrameProtocol(client=True, extensions=[ext])
result = ext.frame_inbound_header(
proto, fp.Opcode.PING, fp.RsvBits(False, False, False), len(payload)
)
assert isinstance(result, fp.RsvBits)
assert result.rsv1
data = ext.frame_inbound_payload_data(proto, payload)
assert data == payload
assert ext.frame_inbound_complete(proto, True) is None
def test_incomplete_unicode(self) -> None:
payload = "fñör∂".encode("utf8")
payload = payload[:4]
decoder = fp.MessageDecoder()
frame = fp.Frame(
opcode=fp.Opcode.TEXT,
payload=payload,
frame_finished=True,
message_finished=True,
)
with pytest.raises(fp.ParseFailed) as excinfo:
decoder.process_frame(frame)
assert excinfo.value.code is fp.CloseReason.INVALID_FRAME_PAYLOAD_DATA
def test_client_side_masking_short_frame(self) -> None:
proto = fp.FrameProtocol(client=True, extensions=[])
payload = b"x" * 125
data = proto.send_data(payload, fin=True)
assert data[0] == 0x82
assert struct.unpack("!B", data[1:2])[0] == len(payload) | 0x80
masking_key = data[2:6]
maskbytes = itertools.cycle(masking_key)
assert data[6:] == bytearray(b ^ next(maskbytes) for b in bytearray(payload))
def test_frame_inbound_header(self) -> None:
ext = wpext.Extension()
result = ext.frame_inbound_header(None, None, None, None) # type: ignore
assert result == fp.RsvBits(False, False, False)
def _handle_data_received(self, event, source_conn, other_conn, is_server):
fb = self.server_frame_buffer if is_server else self.client_frame_buffer
fb.append(event.data)
if event.message_finished:
original_chunk_sizes = [len(f) for f in fb]
if isinstance(event, events.TextReceived):
message_type = wsproto.frame_protocol.Opcode.TEXT
payload = ''.join(fb)
else:
message_type = wsproto.frame_protocol.Opcode.BINARY
payload = b''.join(fb)
fb.clear()
websocket_message = WebSocketMessage(message_type, not is_server, payload)
length = len(websocket_message.content)
self.flow.messages.append(websocket_message)
self.channel.ask("websocket_message", self.flow)
if not self.flow.stream and not websocket_message.killed:
def get_chunk(payload):
if len(payload) == length:
# message has the same length, we can reuse the same sizes
pos = 0
for s in original_chunk_sizes:
yield (payload[pos:pos + s], True if pos + s == length else False)