Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_response_until_eof(tctx):
"""Test scenario where the server response body is terminated by EOF."""
server = Placeholder()
assert (
Playbook(http.HTTPLayer(tctx, HTTPMode.regular), hooks=False)
>> DataReceived(tctx.client, b"GET http://example.com/ HTTP/1.1\r\nHost: example.com\r\n\r\n")
<< OpenConnection(server)
>> reply(None)
<< SendData(server, b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n")
>> DataReceived(server, b"HTTP/1.1 200 OK\r\n\r\nfoo")
>> ConnectionClosed(server)
<< SendData(tctx.client, b"HTTP/1.1 200 OK\r\n\r\nfoo")
<< CloseConnection(tctx.client)
)
def test_http_reply_from_proxy(tctx):
"""Test a response served by mitmproxy itself."""
def reply_from_proxy(flow: HTTPFlow):
flow.response = HTTPResponse.make(418)
assert (
Playbook(http.HTTPLayer(tctx, HTTPMode.regular), hooks=False)
>> DataReceived(tctx.client, b"GET http://example.com/ HTTP/1.1\r\nHost: example.com\r\n\r\n")
<< http.HttpRequestHook(Placeholder())
>> reply(side_effect=reply_from_proxy)
<< SendData(tctx.client, b"HTTP/1.1 418 I'm a teapot\r\ncontent-length: 0\r\n\r\n")
)
Playbook(http.HTTPLayer(tctx, HTTPMode.regular), hooks=False)
>> DataReceived(tctx.client, b"CONNECT example.com:80 HTTP/1.1\r\n\r\n")
<< http.HttpConnectHook(Placeholder())
>> reply()
<< OpenConnection(server1)
>> reply(None)
<< SendData(tctx.client, b'HTTP/1.1 200 Connection established\r\n\r\n')
>> DataReceived(tctx.client, b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n")
<< layer.NextLayerHook(Placeholder())
>> reply_next_layer(lambda ctx: http.HTTPLayer(ctx, HTTPMode.transparent))
<< http.HttpRequestHook(flow)
>> ConnectionClosed(server1)
>> reply(to=-2)
<< OpenConnection(server2)
>> reply(None)
<< SendData(server2, b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n")
>> DataReceived(server2, b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n")
<< SendData(tctx.client, b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n")
)
assert server1() != server2()
assert flow().server_conn == server2()
def enable_streaming(flow: HTTPFlow):
flow.response.stream = lambda x: x.upper()
assert (
Playbook(http.HTTPLayer(tctx, HTTPMode.regular), hooks=False)
>> DataReceived(tctx.client, b"GET http://example.com/largefile HTTP/1.1\r\nHost: example.com\r\n\r\n")
<< OpenConnection(server)
>> reply(None)
<< SendData(server, b"GET /largefile HTTP/1.1\r\nHost: example.com\r\n\r\n")
>> DataReceived(server, b"HTTP/1.1 200 OK\r\nContent-Length: 6\r\n\r\nabc")
<< http.HttpResponseHeadersHook(flow)
>> reply(side_effect=enable_streaming)
<< SendData(tctx.client, b"HTTP/1.1 200 OK\r\nContent-Length: 6\r\n\r\nABC")
>> DataReceived(server, b"def")
<< SendData(tctx.client, b"DEF")
)
elif response == "early response":
# We may receive a response before we have finished sending our request.
# We continue sending unless the server closes the connection.
# https://tools.ietf.org/html/rfc7231#section-6.5.11
assert (
playbook
>> DataReceived(server, b"HTTP/1.1 413 Request Entity Too Large\r\nContent-Length: 0\r\n\r\n")
<< SendData(tctx.client, b"HTTP/1.1 413 Request Entity Too Large\r\nContent-Length: 0\r\n\r\n")
>> DataReceived(tctx.client, b"def")
<< SendData(server, b"DEF")
)
elif response == "early close":
assert (
playbook
>> DataReceived(server, b"HTTP/1.1 413 Request Entity Too Large\r\nContent-Length: 0\r\n\r\n")
<< SendData(tctx.client, b"HTTP/1.1 413 Request Entity Too Large\r\nContent-Length: 0\r\n\r\n")
>> ConnectionClosed(server)
<< CloseConnection(server)
<< CloseConnection(tctx.client)
)
elif response == "early kill":
err = Placeholder()
assert (
playbook
>> ConnectionClosed(server)
<< CloseConnection(server)
<< SendData(tctx.client, err)
<< CloseConnection(tctx.client)
)
assert b"502 Bad Gateway" in err()
else: # pragma: no cover
assert False
# but keep a queue of still-to-send requests.
self.send_queue.append(event)
return
if isinstance(event, RequestHeaders):
raw = http1.assemble_request_head(event.request)
yield commands.SendData(self.conn, raw)
elif isinstance(event, RequestData):
if "chunked" in self.request.headers.get("transfer-encoding", "").lower():
raw = b"%x\r\n%s\r\n" % (len(event.data), event.data)
else:
raw = event.data
yield commands.SendData(self.conn, raw)
elif isinstance(event, RequestEndOfMessage):
if "chunked" in self.request.headers.get("transfer-encoding", "").lower():
yield commands.SendData(self.conn, b"0\r\n\r\n")
elif http1.expected_http_body_size(self.request) == -1:
assert not self.send_queue
yield commands.CloseConnection(self.conn)
yield from self.mark_done(request=True)
elif isinstance(event, RequestProtocolError):
yield commands.CloseConnection(self.conn)
return
else:
raise NotImplementedError(f"{event}")
event = yield from self.client_conn.next_event()
if event is h11.NEED_DATA:
return
elif isinstance(event, h11.Data):
self.flow_events[0].append(event)
elif isinstance(event, h11.EndOfMessage):
self.flow_events[0].append(event)
yield commands.Log(f"request {self.flow_events}")
if self.flow.request.first_line_format == FirstLineFormat.authority.value:
if self.mode == HTTPMode.regular:
yield commands.Hook("http_connect", self.flow)
self.context.server = context.Server(
(self.flow.request.host, self.flow.request.port)
)
yield commands.SendData(
self.context.client,
b'%s 200 Connection established\r\n\r\n' % self.flow.request.data.http_version
)
child_layer = NextLayer(self.context)
self._handle_event = child_layer.handle_event
yield from child_layer.handle_event(events.Start())
return
if self.mode == HTTPMode.upstream:
raise NotImplementedError()
elif self.flow.request.first_line_format == FirstLineFormat.absolute.value:
if self.mode == HTTPMode.regular:
self.context.server.address = (self.flow.request.host, self.flow.request.port)
else:
raise NotImplementedError()
def relay_messages(self, event: events.Event) -> commands.TCommandGenerator:
if isinstance(event, events.DataReceived):
if event.data:
self.tls[event.connection].bio_write(event.data)
yield from self.tls_interact(event.connection)
while True:
try:
plaintext = self.tls[event.connection].recv(4096)
except (SSL.WantReadError, SSL.ZeroReturnError):
return
event_for_child = events.DataReceived(self.context.server, plaintext)
for event_from_child in self.child_layer.handle_event(event_for_child):
if isinstance(event_from_child, commands.SendData):
self.tls[event_from_child.connection].sendall(event_from_child.data)
yield from self.tls_interact(event_from_child.connection)
else:
yield event_from_child
elif isinstance(event, events.ConnectionClosed):
warn("unimplemented: tls.relay_messages:close")
pos += s
else:
# just re-chunk everything into 4kB frames
# header len = 4 bytes without masking key and 8 bytes with masking key
chunk_size = 4088 if from_client else 4092
chunks = range(0, len(payload), chunk_size)
for i in chunks:
yield (payload[i:i + chunk_size], True if i + chunk_size >= len(payload) else False)
for chunk, final in get_chunk(websocket_message.content):
other.send_data(chunk, final)
yield commands.SendData(send_to, other.bytes_to_send())
if self.flow.stream:
other.send_data(ws_event.data, ws_event.message_finished)
yield commands.SendData(send_to, other.bytes_to_send())
def tls_interact(self, conn: Connection):
while True:
try:
data = self.tls[conn].bio_read(4096)
except SSL.WantReadError:
# Okay, nothing more waiting to be sent.
return
else:
yield commands.SendData(conn, data)