Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_no_original(self):
headers = {}
self.assertEqual(parse_x_forwarded_for(headers), (None, None))
def test_address_only(self):
headers = Headers({b"X-Forwarded-For": [b"10.1.2.3"]})
self.assertEqual(parse_x_forwarded_for(headers), (["10.1.2.3", 0], None))
def test_multiple_proxys(self):
headers = Headers({b"X-Forwarded-For": [b"10.1.2.3, 10.1.2.4"]})
self.assertEqual(parse_x_forwarded_for(headers), (["10.1.2.3", 0], None))
def test_original(self):
headers = Headers({})
self.assertEqual(
parse_x_forwarded_for(headers, original_addr=["127.0.0.1", 80]),
(["127.0.0.1", 80], None),
)
def test_basic(self):
headers = {
b"X-Forwarded-For": b"10.1.2.3",
b"X-Forwarded-Port": b"1234",
b"X-Forwarded-Proto": b"https",
}
self.assertEqual(parse_x_forwarded_for(headers), (["10.1.2.3", 1234], "https"))
def test_multiple_proxies(self):
headers = {b"X-Forwarded-For": b"10.1.2.3, 10.1.2.4"}
self.assertEqual(parse_x_forwarded_for(headers), (["10.1.2.3", 0], None))
def test_v6_address(self):
headers = {b"X-Forwarded-For": [b"1043::a321:0001, 10.0.5.6"]}
self.assertEqual(parse_x_forwarded_for(headers), (["1043::a321:0001", 0], None))
def test_address_only(self):
headers = {b"X-Forwarded-For": b"10.1.2.3"}
self.assertEqual(parse_x_forwarded_for(headers), (["10.1.2.3", 0], None))
def test_basic(self):
headers = Headers(
{
b"X-Forwarded-For": [b"10.1.2.3"],
b"X-Forwarded-Port": [b"1234"],
b"X-Forwarded-Proto": [b"https"],
}
)
result = parse_x_forwarded_for(headers)
self.assertEqual(result, (["10.1.2.3", 1234], "https"))
self.assertIsInstance(result[0][0], str)
self.assertIsInstance(result[1], str)
# Prevent CVE-2015-0219
if b"_" in name:
continue
self.clean_headers.append((name.lower(), value.encode("latin1")))
# Get client address if possible
peer = self.transport.getPeer()
host = self.transport.getHost()
if hasattr(peer, "host") and hasattr(peer, "port"):
self.client_addr = [str(peer.host), peer.port]
self.server_addr = [str(host.host), host.port]
else:
self.client_addr = None
self.server_addr = None
if self.server.proxy_forwarded_address_header:
self.client_addr, self.client_scheme = parse_x_forwarded_for(
dict(self.clean_headers),
self.server.proxy_forwarded_address_header,
self.server.proxy_forwarded_port_header,
self.server.proxy_forwarded_proto_header,
self.client_addr,
)
# Decode websocket subprotocol options
subprotocols = []
for header, value in self.clean_headers:
if header == b"sec-websocket-protocol":
subprotocols = [
x.strip() for x in unquote(value.decode("ascii")).split(",")
]
# Make new application instance with scope
self.path = request.path.encode("ascii")
self.application_deferred = defer.maybeDeferred(