Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _make_connection_request(request_headers: Headers, method: str = "GET") -> Request:
client = h11.Connection(h11.CLIENT)
server = WSConnection(SERVER)
server.receive_data(
client.send(h11.Request(method=method, target="/", headers=request_headers))
)
event = next(server.events())
assert isinstance(event, Request)
return event
def _make_handshake_rejection(
status_code: int, body: Optional[bytes] = None
) -> List[Event]:
client = WSConnection(CLIENT)
server = h11.Connection(h11.SERVER)
server.receive_data(client.send(Request(host="localhost", target="/")))
headers = []
if body is not None:
headers.append(("Content-Length", str(len(body))))
client.receive_data(
server.send(h11.Response(status_code=status_code, headers=headers))
)
if body is not None:
client.receive_data(server.send(h11.Data(data=body)))
client.receive_data(server.send(h11.EndOfMessage()))
return list(client.events())
def new_conn(sock: socket.socket) -> None:
global count
print("test_server.py received connection {}".format(count))
count += 1
ws = WSConnection(SERVER)
closed = False
while not closed:
try:
data: Optional[bytes] = sock.recv(65535)
except socket.error:
data = None
ws.receive_data(data or None)
outgoing_data = b""
for event in ws.events():
if isinstance(event, Request):
outgoing_data += ws.send(
AcceptConnection(extensions=[PerMessageDeflate()])
)
elif isinstance(event, Message):
def _make_handshake(
request_headers: Headers,
accept_headers: Optional[Headers] = None,
subprotocol: Optional[str] = None,
extensions: Optional[List[Extension]] = None,
) -> Tuple[h11.InformationalResponse, bytes]:
client = h11.Connection(h11.CLIENT)
server = WSConnection(SERVER)
nonce = generate_nonce()
server.receive_data(
client.send(
h11.Request(
method="GET",
target="/",
headers=[
(b"Host", b"localhost"),
(b"Connection", b"Keep-Alive, Upgrade"),
(b"Upgrade", b"WebSocket"),
(b"Sec-WebSocket-Version", b"13"),
(b"Sec-WebSocket-Key", nonce),
]
+ request_headers,
)
)
def get_case_count(server: str) -> int:
uri = urlparse(server + "/getCaseCount")
connection = WSConnection(CLIENT)
sock = socket.socket()
sock.connect((uri.hostname, uri.port or 80))
sock.sendall(connection.send(Request(host=uri.netloc, target=uri.path)))
case_count: Optional[int] = None
while case_count is None:
in_data = sock.recv(65535)
connection.receive_data(in_data)
data = ""
out_data = b""
for event in connection.events():
if isinstance(event, TextMessage):
data += event.data
if event.message_finished:
case_count = json.loads(data)
self.connections = server_state.connections
self.tasks = server_state.tasks
# Connection state
self.transport = None
self.server = None
self.client = None
self.scheme = None
# WebSocket state
self.connect_event = None
self.queue = asyncio.Queue()
self.handshake_complete = False
self.close_sent = False
self.conn = wsproto.WSConnection(connection_type=ConnectionType.SERVER)
self.read_paused = False
self.writable = asyncio.Event()
self.writable.set()
# Buffers
self.bytes = b""
self.text = ""
0) Open TCP connection
1) Negotiate WebSocket opening handshake
2) Send a message and display response
3) Send ping and display pong
4) Negotiate WebSocket closing handshake
"""
# 0) Open TCP connection
print("Connecting to {}:{}".format(host, port))
conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
conn.connect((host, port))
# 1) Negotiate WebSocket opening handshake
print("Opening WebSocket")
ws = WSConnection(ConnectionType.CLIENT)
# Because this is a client WebSocket, we need to initiate the connection
# handshake by sending a Request event.
net_send(ws.send(Request(host=host, target="server")), conn)
net_recv(ws, conn)
handle_events(ws)
# 2) Send a message and display response
message = "wsproto is great"
print("Sending message: {}".format(message))
net_send(ws.send(Message(data=message)), conn)
net_recv(ws, conn)
handle_events(ws)
# 3) Send ping and display pong
payload = b"table tennis"
print("Sending ping: {!r}".format(payload))