Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_server_close(tctx, ws_playbook):
f = tutils.Placeholder()
frames = [
bytes(Frame(fin=1, opcode=OPCODE.CLOSE, payload=struct.pack('>H', 1000))),
bytes(Frame(fin=1, mask=1, opcode=OPCODE.CLOSE, payload=struct.pack('>H', 1000))),
]
assert (
ws_playbook
<< commands.Hook("websocket_start", f)
>> events.HookReply(-1)
>> events.DataReceived(tctx.server, frames[0])
<< commands.SendData(tctx.client, frames[0])
<< commands.SendData(tctx.server, frames[1])
<< commands.Hook("websocket_end", f)
>> events.HookReply(-1)
<< commands.CloseConnection(tctx.client)
)
def test_connection_failed(tctx, ws_playbook):
f = tutils.Placeholder()
frames = [
b'Not a valid frame',
bytes(Frame(fin=1, mask=1, opcode=OPCODE.CLOSE, payload=struct.pack('>H', 1002) + b'Invalid opcode 0xe')),
bytes(Frame(fin=1, opcode=OPCODE.CLOSE, payload=struct.pack('>H', 1002) + b'Invalid opcode 0xe')),
]
assert (
ws_playbook
<< commands.Hook("websocket_start", f)
>> events.HookReply(-1)
>> events.DataReceived(tctx.client, frames[0])
<< commands.SendData(tctx.server, frames[1])
<< commands.SendData(tctx.client, frames[2])
<< commands.Hook("websocket_error", f)
>> events.HookReply(-1)
<< commands.Hook("websocket_end", f)
>> events.HookReply(-1)
)
def test_connection_closed(tctx, ws_playbook):
f = tutils.Placeholder()
assert (
ws_playbook
<< commands.Hook("websocket_start", f)
>> events.HookReply(-1)
>> events.ConnectionClosed(tctx.server)
<< commands.Log("error", "Connection closed abnormally")
<< commands.CloseConnection(tctx.client)
<< commands.Hook("websocket_error", f)
>> events.HookReply(-1)
<< commands.Hook("websocket_end", f)
>> events.HookReply(-1)
)
assert f().error
assert (
ws_playbook
<< commands.Hook("websocket_start", f)
>> events.HookReply(-1)
>> events.DataReceived(tctx.client, frames[0])
<< commands.Hook("websocket_message", f)
>> events.HookReply(-1)
<< commands.SendData(tctx.server, frames[0])
>> events.DataReceived(tctx.server, frames[1])
<< commands.Hook("websocket_message", f)
>> events.HookReply(-1)
<< commands.SendData(tctx.client, frames[1])
>> events.DataReceived(tctx.client, frames[2])
<< commands.SendData(tctx.server, frames[2])
<< commands.SendData(tctx.client, frames[3])
<< commands.Hook("websocket_end", f)
>> events.HookReply(-1)
>> events.DataReceived(tctx.server, frames[4])
<< None
)
assert len(f().messages) == 2
def test_connection_closed(tctx, ws_playbook):
f = tutils.Placeholder()
assert (
ws_playbook
<< commands.Hook("websocket_start", f)
>> events.HookReply(-1)
>> events.ConnectionClosed(tctx.server)
<< commands.Log("error", "Connection closed abnormally")
<< commands.CloseConnection(tctx.client)
<< commands.Hook("websocket_error", f)
>> events.HookReply(-1)
<< commands.Hook("websocket_end", f)
>> events.HookReply(-1)
)
assert f().error
from mitmproxy.proxy2 import commands, events
from mitmproxy.proxy2.commands import Hook
from mitmproxy.proxy2.context import Context
from mitmproxy.proxy2.layer import Layer
from mitmproxy.proxy2.utils import expect
class TcpStartHook(Hook):
flow: tcp.TCPFlow
class TcpMessageHook(Hook):
flow: tcp.TCPFlow
class TcpEndHook(Hook):
flow: tcp.TCPFlow
class TcpErrorHook(Hook):
flow: tcp.TCPFlow
class TCPLayer(Layer):
"""
Simple TCP layer that just relays messages right now.
"""
context: Context
flow: Optional[tcp.TCPFlow]
def __init__(self, context: Context, ignore: bool = False):
super().__init__(context)
self.flow.request.host_header = self.context.server.address[0]
# Determine .scheme, .host and .port attributes for inline scripts. For
# absolute-form requests, they are directly given in the request. For
# authority-form requests, we only need to determine the request
# scheme. For relative-form requests, we need to determine host and
# port as well.
if self.mode is HTTPMode.transparent:
# Setting request.host also updates the host header, which we want
# to preserve
host_header = self.flow.request.host_header
self.flow.request.host = self.context.server.address[0]
self.flow.request.port = self.context.server.address[1]
self.flow.request.host_header = host_header # set again as .host overwrites this.
self.flow.request.scheme = "https" if self.context.server.tls else "http"
yield commands.Hook("request", self.flow)
if self.flow.response:
# response was set by an inline script.
# we now need to emulate the responseheaders hook.
yield commands.Hook("responseheaders", self.flow)
yield from self.handle_response()
else:
connection, err = yield GetHttpConnection(
(self.flow.request.host, self.flow.request.port),
self.flow.request.scheme == "https"
)
if err:
yield from self.send_error_response(502, err)
self.flow.error = flow.Error(err)
yield commands.Hook("error", self.flow)
return
def response_received(self):
yield commands.Hook("response", self.flow)
raw = http1.assemble_response_head(self.flow.response)
yield commands.SendData(self.context.server, raw)
if not f.response.stream:
# no streaming:
# we already received the full response from the server and can
# send it to the client straight away.
self.send_response(f.response)
else:
# streaming:
# First send the headers and then transfer the response incrementally
self.send_response_headers(f.response)
chunks = self.read_response_body(
f.request,
f.response
# scheme. For relative-form requests, we need to determine host and
# port as well.
if self.mode is HTTPMode.transparent:
# Setting request.host also updates the host header, which we want
# to preserve
host_header = self.flow.request.host_header
self.flow.request.host = self.context.server.address[0]
self.flow.request.port = self.context.server.address[1]
self.flow.request.host_header = host_header # set again as .host overwrites this.
self.flow.request.scheme = "https" if self.context.server.tls else "http"
yield commands.Hook("request", self.flow)
if self.flow.response:
# response was set by an inline script.
# we now need to emulate the responseheaders hook.
yield commands.Hook("responseheaders", self.flow)
yield from self.response_received()
else:
raise NotImplementedError("Get your responses elsewhere.")
def read_request_headers(self, event: RequestHeaders) -> commands.TCommandGenerator:
self.stream_id = event.stream_id
self.flow = http.HTTPFlow(
self.context.client,
self.context.server
)
self.flow.request = event.request
if self.flow.request.first_line_format == "authority":
yield from self.handle_connect()
return
else:
yield commands.Hook("requestheaders", self.flow)
if self.flow.request.headers.get("expect", "").lower() == "100-continue":
raise NotImplementedError("expect nothing")
# self.send_response(http.expect_continue_response)
# request.headers.pop("expect")
if self.flow.request.stream:
raise NotImplementedError # FIXME
else:
self._handle_event = self.read_request_body