Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_ping_pong_hidden_payload(tctx, ws_playbook):
f = tutils.Placeholder()
frames = [
bytes(Frame(fin=1, opcode=OPCODE.PING, payload=b'foobar')),
bytes(Frame(fin=1, opcode=OPCODE.PING, payload=b'')),
bytes(Frame(fin=1, mask=1, opcode=OPCODE.PONG, payload=b'foobar')),
bytes(Frame(fin=1, mask=1, opcode=OPCODE.PONG, payload=b'')),
]
assert (
ws_playbook
<< commands.Hook("websocket_start", f)
>> events.HookReply(-1)
>> events.DataReceived(tctx.server, frames[0])
<< commands.Log("info", "WebSocket PING received from server: foobar")
<< commands.SendData(tctx.client, frames[1])
<< commands.SendData(tctx.server, frames[2])
>> events.DataReceived(tctx.client, frames[3])
<< commands.Log("info", "WebSocket PONG received from client: ")
)
to establish TLS with the client.
"""
tssl_server = SSLTest(server_side=True, alpn=["quux"])
playbook, client_layer, tssl_client = _test_tls_client_server(tctx, alpn=["quux"])
# We should now get instructed to open a server connection.
data = tutils.Placeholder()
tls_clienthello = tutils.Placeholder()
def require_server_conn(hook: commands.Hook) -> None:
assert isinstance(hook.data, tls.ClientHelloHookData)
hook.data.establish_server_tls_first = True
assert (
playbook
>> events.DataReceived(tctx.client, tssl_client.out.read())
<< commands.Hook("tls_clienthello", tls_clienthello)
>> tutils.reply(side_effect=require_server_conn)
<< commands.OpenConnection(tctx.server)
>> tutils.reply(None)
<< commands.Hook("tls_start", tutils.Placeholder())
>> reply_tls_start(alpn=b"quux")
<< commands.SendData(tctx.server, data)
)
# Establish TLS with the server...
tssl_server.inc.write(data())
with pytest.raises(ssl.SSLWantReadError):
tssl_server.obj.do_handshake()
data = tutils.Placeholder()
assert (
def test_https_proxy(strategy, tctx):
"""Test a CONNECT request, followed by a HTTP GET /"""
server = Placeholder()
flow = Placeholder()
playbook = Playbook(http.HTTPLayer(tctx, HTTPMode.regular))
tctx.options.connection_strategy = strategy
(playbook
>> DataReceived(tctx.client, b"CONNECT example.proxy:80 HTTP/1.1\r\n\r\n")
<< http.HttpConnectHook(Placeholder())
>> reply())
if strategy == "eager":
(playbook
<< OpenConnection(server)
>> reply(None))
(playbook
<< SendData(tctx.client, b'HTTP/1.1 200 Connection established\r\n\r\n')
>> DataReceived(tctx.client, b"GET /foo?hello=1 HTTP/1.1\r\nHost: example.com\r\n\r\n")
<< layer.NextLayerHook(Placeholder())
>> reply_next_layer(lambda ctx: http.HTTPLayer(ctx, HTTPMode.transparent))
<< http.HttpRequestHeadersHook(flow)
>> reply()
<< http.HttpRequestHook(flow)
>> reply())
if strategy == "lazy":
server = Placeholder()
flow = Placeholder()
playbook = Playbook(http.HTTPLayer(tctx, HTTPMode.regular))
tctx.options.connection_strategy = strategy
(playbook
>> DataReceived(tctx.client, b"CONNECT example.proxy:80 HTTP/1.1\r\n\r\n")
<< http.HttpConnectHook(Placeholder())
>> reply())
if strategy == "eager":
(playbook
<< OpenConnection(server)
>> reply(None))
(playbook
<< SendData(tctx.client, b'HTTP/1.1 200 Connection established\r\n\r\n')
>> DataReceived(tctx.client, b"GET /foo?hello=1 HTTP/1.1\r\nHost: example.com\r\n\r\n")
<< layer.NextLayerHook(Placeholder())
>> reply_next_layer(lambda ctx: http.HTTPLayer(ctx, HTTPMode.transparent))
<< http.HttpRequestHeadersHook(flow)
>> reply()
<< http.HttpRequestHook(flow)
>> reply())
if strategy == "lazy":
(playbook
<< OpenConnection(server)
>> reply(None))
(playbook
<< SendData(server, b"GET /foo?hello=1 HTTP/1.1\r\nHost: example.com\r\n\r\n")
>> DataReceived(server, b"HTTP/1.1 200 OK\r\nContent-Length: 12\r\n\r\nHello World!")
<< http.HttpResponseHeadersHook(flow)
>> reply()
<< http.HttpResponseHook(flow)
def _handle_event(self, event: events.Event) -> commands.TCommandGenerator:
if isinstance(event, events.DataReceived) and event.data == b"establish-server-tls":
# noinspection PyTypeChecker
err = yield tls.EstablishServerTLS(self.context.server)
if err:
yield commands.SendData(event.connection, f"server-tls-failed: {err}".encode())
else:
yield from super()._handle_event(event)
nl = layer.NextLayer(tctx)
playbook = tutils.Playbook(nl)
assert (
playbook
>> events.DataReceived(tctx.client, b"foo")
<< layer.NextLayerHook(nl)
)
nl.layer = tutils.EchoLayer(tctx)
handle = nl.handle_event
assert (
playbook
>> tutils.reply()
<< commands.SendData(tctx.client, b"foo")
)
sd, = handle(events.DataReceived(tctx.client, b"bar"))
assert isinstance(sd, commands.SendData)
(playbook
<< SendData(tctx.client, b'HTTP/1.1 200 Connection established\r\n\r\n')
>> DataReceived(tctx.client, b"GET /foo?hello=1 HTTP/1.1\r\nHost: example.com\r\n\r\n")
<< layer.NextLayerHook(Placeholder())
>> reply_next_layer(lambda ctx: http.HTTPLayer(ctx, HTTPMode.transparent))
<< http.HttpRequestHeadersHook(flow)
>> reply()
<< http.HttpRequestHook(flow)
>> reply())
if strategy == "lazy":
(playbook
<< OpenConnection(server)
>> reply(None))
(playbook
<< SendData(server, b"GET /foo?hello=1 HTTP/1.1\r\nHost: example.com\r\n\r\n")
>> DataReceived(server, b"HTTP/1.1 200 OK\r\nContent-Length: 12\r\n\r\nHello World!")
<< http.HttpResponseHeadersHook(flow)
>> reply()
<< http.HttpResponseHook(flow)
>> reply()
<< SendData(tctx.client, b"HTTP/1.1 200 OK\r\nContent-Length: 12\r\n\r\nHello World!"))
assert playbook
if https_server:
flow.request.url = "https://redirected.site/"
else:
flow.request.url = "http://redirected.site/"
if https_client:
p >> DataReceived(tctx.client, b"CONNECT example.com:80 HTTP/1.1\r\n\r\n")
if strategy == "eager":
p << OpenConnection(Placeholder())
p >> reply(None)
p << SendData(tctx.client, b'HTTP/1.1 200 Connection established\r\n\r\n')
p >> DataReceived(tctx.client, b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n")
p << layer.NextLayerHook(Placeholder())
p >> reply_next_layer(lambda ctx: http.HTTPLayer(ctx, HTTPMode.transparent))
else:
p >> DataReceived(tctx.client, b"GET http://example.com/ HTTP/1.1\r\nHost: example.com\r\n\r\n")
p << http.HttpRequestHook(flow)
p >> reply(side_effect=redirect)
p << OpenConnection(server)
p >> reply(None)
if https_server:
p << tls.EstablishServerTLS(server)
p >> reply_establish_server_tls()
p << SendData(server, b"GET / HTTP/1.1\r\nHost: redirected.site\r\n\r\n")
p >> DataReceived(server, b"HTTP/1.1 200 OK\r\nContent-Length: 12\r\n\r\nHello World!")
p << SendData(tctx.client, b"HTTP/1.1 200 OK\r\nContent-Length: 12\r\n\r\nHello World!")
assert p
if https_server:
assert server().address == ("redirected.site", 443)
else:
assert server().address == ("redirected.site", 80)
def start_read_request_body(self) -> commands.TCommandGenerator:
self.state = self.read_request_body
yield from self.read_request_body(events.DataReceived(self.context.client, b""))
def read_request_body(self, event: events.Event) -> commands.TCommandGenerator:
if isinstance(event, events.DataReceived) and event.connection == self.context.client:
try:
event = self.body_reader(self.client_buf)
except h11.ProtocolError as e:
raise # FIXME
elif isinstance(event, events.ConnectionClosed):
try:
event = self.body_reader.read_eof()
except h11.ProtocolError as e:
raise # FIXME
else:
return warn(f"ClientHTTP1Layer.read_request_body: unimplemented {event}")
if event is None:
return
elif isinstance(event, h11.Data):
self.body_buf += event.data