Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_get_header_tokens():
headers = Headers()
assert get_header_tokens(headers, "foo") == []
headers["foo"] = "bar"
assert get_header_tokens(headers, "foo") == ["bar"]
headers["foo"] = "bar, voing"
assert get_header_tokens(headers, "foo") == ["bar", "voing"]
headers.set_all("foo", ["bar, voing", "oink"])
assert get_header_tokens(headers, "foo") == ["bar", "voing", "oink"]
def tflow():
headers = Headers(test=b"value")
req = wsgi.Request("http", "GET", "/", "HTTP/1.1", headers, "")
return wsgi.Flow(("127.0.0.1", 8888), req)
assert cv.get("noop")
r = cv.get_content_view(
cv.get("noop"),
"[1, 2, 3]",
headers=Headers(
content_type="text/plain"
)
)
assert "noop" in r[0]
# now try content-type matching
r = cv.get_content_view(
cv.get("Auto"),
"[1, 2, 3]",
headers=Headers(
content_type="text/none"
)
)
assert "noop" in r[0]
# now try removing the custom view
cv.remove(view_obj)
r = cv.get_content_view(
cv.get("Auto"),
b"[1, 2, 3]",
headers=Headers(
content_type="text/none"
)
)
assert "noop" not in r[0]
def test_get_server_accept(self, input, expected):
h = http.Headers(input)
assert websockets.get_server_accept(h) == expected
response = http.http1.read_response(self.client.rfile, request)
if self.ssl:
self.client.convert_to_tls()
assert self.client.tls_established
request = http.Request(
"relative",
"GET",
"http",
"127.0.0.1",
self.server.server.address[1],
"/ws",
"HTTP/1.1",
headers=http.Headers(
connection="upgrade",
upgrade="websocket",
sec_websocket_version="13",
sec_websocket_key="1234",
sec_websocket_extensions="permessage-deflate" if extension else ""
),
content=b'')
self.client.wfile.write(http.http1.assemble_request(request))
self.client.wfile.flush()
response = http.http1.read_response(self.client.rfile, request)
assert websockets.check_handshake(response.headers)
def test_get_client_key(self, input, expected):
h = http.Headers(input)
assert websockets.get_client_key(h) == expected
def client_handshake_headers(version=None, key=None, protocol=None, extensions=None):
"""
Create the headers for a valid HTTP upgrade request. If Key is not
specified, it is generated, and can be found in sec-websocket-key in
the returned header set.
Returns an instance of http.Headers
"""
if version is None:
version = VERSION
if key is None:
key = base64.b64encode(os.urandom(16)).decode('ascii')
h = http.Headers(
connection="upgrade",
upgrade="websocket",
sec_websocket_version=version,
sec_websocket_key=key,
)
if protocol is not None:
h['sec-websocket-protocol'] = protocol
if extensions is not None:
h['sec-websocket-extensions'] = extensions
return h
def read_response_headers(self):
event = yield from self.server_conn.next_event()
if event is h11.NEED_DATA:
return
elif isinstance(event, h11.Response):
yield commands.Log(f"responseheaders {event}")
self.flow_events[1].append(event)
self.state = self.read_response_body
yield from self.read_response_body() # there may already be further events.
elif isinstance(event, h11.InformationalResponse):
self.flow.response.headers = net_http.Headers(event.headers)
if event.status_code == 101 and websockets.check_handshake(self.flow.response.headers):
child_layer = websocket.WebsocketLayer(self.context, self.flow)
yield from child_layer.handle_event(events.Start())
self._handle_event = child_layer.handle_event
return
else:
raise TypeError(f"Unexpected event: {event}")
def make_connect_response(http_version):
# Do not send any response headers as it breaks proxying non-80 ports on
# Android emulators using the -http-proxy option.
return HTTPResponse(
http_version,
200,
b"Connection established",
http.Headers(),
b"",
)
expect_continue_response = HTTPResponse(
b"HTTP/1.1", 100, b"Continue", http.Headers(), b""
)
def _handle_pushed_stream_received(self, event):
# pushed stream ids should be unique and not dependent on race conditions
# only the parent stream id must be looked up first
parent_eid = self.server_to_client_stream_ids[event.parent_stream_id]
with self.connections[self.client_conn].lock:
self.connections[self.client_conn].push_stream(parent_eid, event.pushed_stream_id, event.headers)
self.client_conn.send(self.connections[self.client_conn].data_to_send())
headers = mitmproxy.net.http.Headers([[k, v] for k, v in event.headers])
layer = Http2SingleStreamLayer(self, self.connections[self.client_conn], event.pushed_stream_id, headers)
self.streams[event.pushed_stream_id] = layer
self.streams[event.pushed_stream_id].timestamp_start = time.time()
self.streams[event.pushed_stream_id].pushed = True
self.streams[event.pushed_stream_id].parent_stream_id = parent_eid
self.streams[event.pushed_stream_id].timestamp_end = time.time()
self.streams[event.pushed_stream_id].request_arrived.set()
self.streams[event.pushed_stream_id].request_data_finished.set()
self.streams[event.pushed_stream_id].start()
return True