Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_inbound_uncompressed_control_frame(self) -> None:
payload = b"x" * 23
ext = wpext.PerMessageDeflate()
ext._enabled = True
proto = fp.FrameProtocol(client=True, extensions=[ext])
result = ext.frame_inbound_header(
proto, fp.Opcode.PING, fp.RsvBits(False, False, False), len(payload)
)
assert isinstance(result, fp.RsvBits)
assert result.rsv1
data = ext.frame_inbound_payload_data(proto, payload)
assert data == payload
assert ext.frame_inbound_complete(proto, True) is None
def frame_inbound_header(
self,
proto: Union[fp.FrameDecoder, fp.FrameProtocol],
opcode: fp.Opcode,
rsv: fp.RsvBits,
payload_length: int,
) -> Union[fp.CloseReason, fp.RsvBits]:
self._inbound_header_called = True
if opcode is fp.Opcode.PONG:
return fp.CloseReason.MANDATORY_EXT
self._inbound_rsv_bit_set = rsv.rsv3
return fp.RsvBits(False, False, True)
def test_final_text_frame(self) -> None:
text_payload = "fñör∂"
binary_payload = text_payload.encode("utf8")
decoder = fp.MessageDecoder()
decoder.opcode = fp.Opcode.TEXT
decoder.decoder = getincrementaldecoder("utf-8")()
assert decoder.decoder.decode(binary_payload[:-2]) == text_payload[:-1]
binary_payload = binary_payload[-2:]
text_payload = text_payload[-1:]
frame = fp.Frame(
opcode=fp.Opcode.CONTINUATION,
payload=binary_payload,
frame_finished=True,
message_finished=True,
)
frame = decoder.process_frame(frame)
assert frame.opcode is fp.Opcode.TEXT
assert frame.message_finished is True
payload = b""
if code:
payload += struct.pack("!H", code)
if reason:
payload += reason.encode("utf8")
elif reason_bytes:
payload += reason_bytes
frame_bytes = b"\x88" + bytearray([len(payload)]) + payload
protocol = fp.FrameProtocol(client=True, extensions=[])
protocol.receive_bytes(frame_bytes)
frames = list(protocol.received_frames())
assert len(frames) == 1
frame = frames[0]
assert frame.opcode == fp.Opcode.CLOSE
assert frame.payload[0] == code or fp.CloseReason.NO_STATUS_RCVD
if reason:
assert frame.payload[1] == reason
else:
assert not frame.payload[1]
frame_bytes = frame_bytes[header_len:]
payload_sent = 0
expected_opcode = fp.Opcode.TEXT
for offset in range(0, len(frame_bytes), chunk_size):
chunk = frame_bytes[offset : offset + chunk_size]
decoder.receive_bytes(chunk)
frame = decoder.process_buffer()
payload_sent += chunk_size
all_payload_sent = payload_sent == len(payload)
assert frame is not None
assert frame.opcode is expected_opcode
assert frame.frame_finished is all_payload_sent
assert frame.message_finished is True
assert frame.payload == payload[offset : offset + chunk_size]
expected_opcode = fp.Opcode.CONTINUATION
def _handle_data_received(self, ws_event, source, other, send_to, from_client, fb):
fb.append(ws_event.data)
if ws_event.message_finished:
original_chunk_sizes = [len(f) for f in fb]
if isinstance(ws_event, wsevents.TextReceived):
message_type = wsproto.frame_protocol.Opcode.TEXT
payload = ''.join(fb)
else:
message_type = wsproto.frame_protocol.Opcode.BINARY
payload = b''.join(fb)
fb.clear()
websocket_message = websocket.WebSocketMessage(message_type, from_client, payload)
length = len(websocket_message.content)
self.flow.messages.append(websocket_message)
yield commands.Hook("websocket_message", self.flow)
if not self.flow.stream and not websocket_message.killed:
def get_chunk(payload):
if len(payload) == length:
# message has the same length, we can reuse the same sizes
def _handle_data_received(self, ws_event, source, other, send_to, from_client, fb):
fb.append(ws_event.data)
if ws_event.message_finished:
original_chunk_sizes = [len(f) for f in fb]
if isinstance(ws_event, wsevents.TextReceived):
message_type = wsproto.frame_protocol.Opcode.TEXT
payload = ''.join(fb)
else:
message_type = wsproto.frame_protocol.Opcode.BINARY
payload = b''.join(fb)
fb.clear()
websocket_message = websocket.WebSocketMessage(message_type, from_client, payload)
length = len(websocket_message.content)
self.flow.messages.append(websocket_message)
yield commands.Hook("websocket_message", self.flow)
if not self.flow.stream and not websocket_message.killed:
def get_chunk(payload):
if len(payload) == length:
# message has the same length, we can reuse the same sizes
pos = 0
for s in original_chunk_sizes:
yield (payload[pos:pos + s], True if pos + s == length else False)
def __init__(
self, type: int, from_client: bool, content: bytes, timestamp: Optional[int]=None, killed: bool=False
) -> None:
self.type = Opcode(type) # type: ignore
"""indicates either TEXT or BINARY (from wsproto.frame_protocol.Opcode)."""
self.from_client = from_client
"""True if this messages was sent by the client."""
self.content = content
"""A byte-string representing the content of this message."""
self.timestamp: int = timestamp or int(time.time())
"""Timestamp of when this message was received or created."""
self.killed = killed
"""True if this messages was killed and should not be sent to the other endpoint."""
def __repr__(self):
if self.type == Opcode.TEXT:
return "text message: {}".format(repr(self.content))
else:
return "binary message: {}".format(strutils.bytes_to_escaped_str(self.content))