Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _split_frame_test(
self,
client: bool,
frame_bytes: bytes,
opcode: fp.Opcode,
payload: bytes,
frame_finished: bool,
message_finished: bool,
split: int,
) -> None:
decoder = fp.FrameDecoder(client=client)
decoder.receive_bytes(frame_bytes[:split])
assert decoder.process_buffer() is None
decoder.receive_bytes(frame_bytes[split:])
frame = decoder.process_buffer()
assert frame is not None
assert frame.opcode is opcode
assert frame.payload == payload
assert frame.frame_finished is frame_finished
assert frame.message_finished is message_finished
def _single_frame_test(
self,
client: bool,
frame_bytes: bytes,
opcode: fp.Opcode,
payload: bytes,
frame_finished: bool,
message_finished: bool,
) -> None:
decoder = fp.FrameDecoder(client=client)
decoder.receive_bytes(frame_bytes)
frame = decoder.process_buffer()
assert frame is not None
assert frame.opcode is opcode
assert frame.payload == payload
assert frame.frame_finished is frame_finished
assert frame.message_finished is message_finished
def _parse_failure_test(
self, client: bool, frame_bytes: bytes, close_reason: fp.CloseReason
) -> None:
decoder = fp.FrameDecoder(client=client)
with pytest.raises(fp.ParseFailed) as excinfo:
decoder.receive_bytes(frame_bytes)
decoder.process_buffer()
assert excinfo.value.code is close_reason
def test_no_frame_completion_when_not_wanted(self) -> None:
ext = self.FakeExtension()
decoder = fp.FrameDecoder(client=True, extensions=[ext])
payload = "fñör∂"
expected_payload = payload.encode("utf-8")
bytes_payload = payload.encode("utf-8")
frame_bytes = b"\x81" + bytearray([len(bytes_payload)]) + bytes_payload
decoder.receive_bytes(frame_bytes)
frame = decoder.process_buffer()
assert frame is not None
assert ext._inbound_header_called
assert not ext._inbound_rsv_bit_set
assert ext._inbound_payload_data_called
assert ext._inbound_complete_called
assert frame.payload == expected_payload
def test_partial_control_frame(self) -> None:
chunk_size = 11
payload = b"x" * 64
frame_bytes = b"\x89" + bytearray([len(payload)]) + payload
decoder = fp.FrameDecoder(client=True)
for offset in range(0, len(frame_bytes) - chunk_size, chunk_size):
chunk = frame_bytes[offset : offset + chunk_size]
decoder.receive_bytes(chunk)
assert decoder.process_buffer() is None
decoder.receive_bytes(frame_bytes[-chunk_size:])
frame = decoder.process_buffer()
assert frame is not None
assert frame.opcode is fp.Opcode.PING
assert frame.frame_finished is True
assert frame.message_finished is True
assert frame.payload == payload
def test_partial_message_frames(self) -> None:
chunk_size = 1024
payload = b"x" * (128 * chunk_size)
payload_len = struct.pack("!Q", len(payload))
frame_bytes = b"\x81\x7f" + payload_len + payload
header_len = len(frame_bytes) - len(payload)
decoder = fp.FrameDecoder(client=True)
decoder.receive_bytes(frame_bytes[:header_len])
assert decoder.process_buffer() is None
frame_bytes = frame_bytes[header_len:]
payload_sent = 0
expected_opcode = fp.Opcode.TEXT
for offset in range(0, len(frame_bytes), chunk_size):
chunk = frame_bytes[offset : offset + chunk_size]
decoder.receive_bytes(chunk)
frame = decoder.process_buffer()
payload_sent += chunk_size
all_payload_sent = payload_sent == len(payload)
assert frame is not None
assert frame.opcode is expected_opcode
assert frame.frame_finished is all_payload_sent
assert frame.message_finished is True
assert frame.payload == payload[offset : offset + chunk_size]
def test_header_error_handling(self) -> None:
ext = self.FakeExtension()
decoder = fp.FrameDecoder(client=True, extensions=[ext])
frame_bytes = b"\x9a\x00"
decoder.receive_bytes(frame_bytes)
with pytest.raises(fp.ParseFailed) as excinfo:
decoder.receive_bytes(frame_bytes)
decoder.process_buffer()
assert excinfo.value.code is fp.CloseReason.MANDATORY_EXT