Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_ping_pong(client_sends: bool) -> None:
client = Connection(CLIENT)
server = Connection(SERVER)
if client_sends:
local = client
remote = server
else:
local = server
remote = client
payload = b"x" * 23
remote.receive_data(local.send(Ping(payload=payload)))
event = next(remote.events())
assert isinstance(event, Ping)
assert event.payload == payload
local.receive_data(remote.send(event.response()))
event = next(local.events())
assert isinstance(event, Pong)
assert event.payload == payload
def test_send_invalid_event() -> None:
client = H11Handshake(CLIENT)
with pytest.raises(LocalProtocolError):
client.send(Ping())
)
closed = False
while not closed:
try:
data: Optional[bytes] = sock.recv(65535)
except CONNECTION_EXCEPTIONS:
data = None
connection.receive_data(data or None)
out_data = b""
for event in connection.events():
if isinstance(event, Message):
out_data += connection.send(
Message(data=event.data, message_finished=event.message_finished)
)
elif isinstance(event, Ping):
out_data += connection.send(event.response())
elif isinstance(event, CloseConnection):
closed = True
out_data += connection.send(event.response())
# else:
# print("??", event)
if out_data is None:
break
try:
sock.sendall(out_data)
except CONNECTION_EXCEPTIONS:
closed = True
break
self.task.add_done_callback(self.maybe_close)
elif isinstance(event, Message):
try:
self.buffer.extend(event)
except FrameTooLarge:
self.write(
self.connection.send(CloseConnection(code=CloseReason.MESSAGE_TOO_BIG))
)
self.app_queue.put_nowait({"type": "websocket.disconnect"})
self.close()
break
if event.message_finished:
self.app_queue.put_nowait(self.buffer.to_message())
self.buffer.clear()
elif isinstance(event, Ping):
self.write(self.connection.send(event.response()))
elif isinstance(event, CloseConnection):
if self.connection.state == ConnectionState.REMOTE_CLOSING:
self.write(self.connection.send(event.response()))
self.app_queue.put_nowait({"type": "websocket.disconnect"})
self.close()
break
async def _reader_task(self):
handlers = {
AcceptConnection: self._handle_accept_connection_event,
BytesMessage: self._handle_message_event,
CloseConnection: self._handle_close_connection_event,
Ping: self._handle_ping_event,
Pong: self._handle_pong_event,
RejectConnection: self._handle_reject_connection_event,
RejectData: self._handle_reject_data_event,
# Request: lambda event: None, # We won't handle server-side events.
TextMessage: self._handle_message_event,
}
# We need to initiate the opening handshake.
await self._do_handshake()
while self._reader_running:
for event in self._wsproto.events():
event_type = type(event)
try:
handler = handlers[event_type]
logger.debug('%s received event: %s', self, event_type)
# handshake by sending a Request event.
net_send(ws.send(Request(host=host, target="server")), conn)
net_recv(ws, conn)
handle_events(ws)
# 2) Send a message and display response
message = "wsproto is great"
print("Sending message: {}".format(message))
net_send(ws.send(Message(data=message)), conn)
net_recv(ws, conn)
handle_events(ws)
# 3) Send ping and display pong
payload = b"table tennis"
print("Sending ping: {!r}".format(payload))
net_send(ws.send(Ping(payload=payload)), conn)
net_recv(ws, conn)
handle_events(ws)
# 4) Negotiate WebSocket closing handshake
print("Closing WebSocket")
net_send(ws.send(CloseConnection(code=1000, reason="sample reason")), conn)
# After sending the closing frame, we won't get any more events. The server
# should send a reply and then close the connection, so we need to receive
# twice:
net_recv(ws, conn)
conn.shutdown(socket.SHUT_WR)
net_recv(ws, conn)
return await self.closing.set()
self.protocol.receive_data(data)
try:
event = next(events)
except StopIteration:
# Connection dropped unexpectedly
return await self.closing.set()
if isinstance(event, CloseConnection):
self.closure = event
await self.outgoing.put(event.response())
await self.closing.set()
elif isinstance(event, Message):
await self.incoming.put(event.data)
elif isinstance(event, Ping):
await self.outgoing.put(event.response())
def handle_events(self):
for event in self.conn.events():
if isinstance(event, events.Request):
self.handle_connect(event)
elif isinstance(event, events.TextMessage):
self.handle_text(event)
elif isinstance(event, events.BytesMessage):
self.handle_bytes(event)
elif isinstance(event, events.RejectConnection):
self.handle_no_connect(event)
elif isinstance(event, events.RejectData):
self.handle_no_connect(event)
elif isinstance(event, events.CloseConnection):
self.handle_close(event)
elif isinstance(event, events.Ping):
self.handle_ping(event)
print("Accepting WebSocket upgrade")
out_data += ws.send(AcceptConnection())
elif isinstance(event, CloseConnection):
# Print log message and break out
print(
"Connection closed: code={} reason={}".format(
event.code, event.reason
)
)
out_data += ws.send(event.response())
running = False
elif isinstance(event, TextMessage):
# Reverse text and send it back to wsproto
print("Received request and sending response")
out_data += ws.send(Message(data=event.data[::-1]))
elif isinstance(event, Ping):
# wsproto handles ping events for you by placing a pong frame in
# the outgoing buffer. You should not call pong() unless you want to
# send an unsolicited pong frame.
print("Received ping and sending pong")
out_data += ws.send(event.response())
else:
print("Unknown event: {!r}".format(event))
# 4) Send data from wsproto to network
print("Sending {} bytes".format(len(out_data)))
stream.send(out_data)