Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def initiate(self, host, path, **kwargs):
ws = WSConnection(CLIENT)
ws.send(Request(host=host, target=path, **kwargs))
data = ws.bytes_to_send()
request, headers = data.split(b"\r\n", 1)
method, path, version = request.strip().split()
headers = parse_headers(headers)
return ws, method, path, version, headers
def _make_handshake_rejection(
status_code: int, body: Optional[bytes] = None
) -> List[Event]:
client = WSConnection(CLIENT)
server = h11.Connection(h11.SERVER)
server.receive_data(client.send(Request(host="localhost", target="/")))
headers = []
if body is not None:
headers.append(("Content-Length", str(len(body))))
client.receive_data(
server.send(h11.Response(status_code=status_code, headers=headers))
)
if body is not None:
client.receive_data(server.send(h11.Data(data=body)))
client.receive_data(server.send(h11.EndOfMessage()))
return list(client.events())
async def connect(self, path, host, port):
await self.socket.connect((host, port))
request = Request(host=f'{host}:{port}', target=path)
await self.socket.sendall(self.protocol.send(request))
upgrade_response = await self.socket.recv(8096)
self.protocol.receive_data(upgrade_response)
event = next(self.protocol.events())
if not isinstance(event, AcceptConnection):
raise Exception('Websocket handshake failed.')
def _make_connection_request(request_headers: Headers, method: str = "GET") -> Request:
client = h11.Connection(h11.CLIENT)
server = WSConnection(SERVER)
server.receive_data(
client.send(h11.Request(method=method, target="/", headers=request_headers))
)
event = next(server.events())
assert isinstance(event, Request)
return event
def test_rejected_handshake() -> None:
client = H11Handshake(CLIENT)
server = H11Handshake(SERVER)
server.receive_data(client.send(Request(host="localhost", target="/")))
assert isinstance(next(server.events()), Request)
client.receive_data(server.send(RejectConnection()))
assert isinstance(next(client.events()), RejectConnection)
assert client.state is ConnectionState.CLOSED
assert server.state is ConnectionState.CLOSED
global count
print("test_server.py received connection {}".format(count))
count += 1
ws = WSConnection(SERVER)
closed = False
while not closed:
try:
data: Optional[bytes] = sock.recv(65535)
except socket.error:
data = None
ws.receive_data(data or None)
outgoing_data = b""
for event in ws.events():
if isinstance(event, Request):
outgoing_data += ws.send(
AcceptConnection(extensions=[PerMessageDeflate()])
)
elif isinstance(event, Message):
outgoing_data += ws.send(
Message(data=event.data, message_finished=event.message_finished)
)
elif isinstance(event, Ping):
outgoing_data += ws.send(event.response())
elif isinstance(event, CloseConnection):
closed = True
if ws.state is not ConnectionState.CLOSED:
outgoing_data += ws.send(event.response())
if not data:
closed = True
def test_connection_request_additional_headers() -> None:
request = _make_connection_request(
Request(
host="localhost",
target="/",
extra_headers=[(b"X-Foo", b"Bar"), (b"X-Bar", b"Foo")],
)
)
headers = normed_header_dict(request.headers)
assert headers[b"x-foo"] == b"Bar"
assert headers[b"x-bar"] == b"Foo"
def __init__(self, path: str, *, framework: ASGIFramework = echo_framework) -> None:
self.client_stream, server_stream = trio.testing.memory_stream_pair()
server_stream.socket = MockSocket()
self.server = WebsocketServer(framework, Config(), server_stream)
self.connection = WSConnection(ConnectionType.CLIENT)
self.server.connection.receive_data(
self.connection.send(Request(target=path, host="hypercorn"))
)
2) Send a message and display response
3) Send ping and display pong
4) Negotiate WebSocket closing handshake
"""
# 0) Open TCP connection
print("Connecting to {}:{}".format(host, port))
conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
conn.connect((host, port))
# 1) Negotiate WebSocket opening handshake
print("Opening WebSocket")
ws = WSConnection(ConnectionType.CLIENT)
# Because this is a client WebSocket, we need to initiate the connection
# handshake by sending a Request event.
net_send(ws.send(Request(host=host, target="server")), conn)
net_recv(ws, conn)
handle_events(ws)
# 2) Send a message and display response
message = "wsproto is great"
print("Sending message: {}".format(message))
net_send(ws.send(Message(data=message)), conn)
net_recv(ws, conn)
handle_events(ws)
# 3) Send ping and display pong
payload = b"table tennis"
print("Sending ping: {!r}".format(payload))
net_send(ws.send(Ping(payload=payload)), conn)
net_recv(ws, conn)
handle_events(ws)