Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_origin_dst(ancdata):
for cmsg_level, cmsg_type, cmsg_data in ancdata:
if cmsg_level == socket.SOL_IP and cmsg_type == IP_RECVORIGDSTADDR:
family, port, ip = struct.unpack("!HH4s8x", cmsg_data)
return (socket.inet_ntoa(ip), port)
def unpack_addr(data, start=0):
atyp = data[start]
if atyp == 1: # IPV4
end = start + 5
ipv4 = data[start+1:end]
host = socket.inet_ntoa(ipv4)
elif atyp == 4: # IPV6
end = start + 17
ipv6 = data[start:end]
host = socket.inet_ntop(socket.AF_INET6, ipv6)
elif atyp == 3: # hostname
length = data[start+1]
end = start + 2 + length
host = data[start+2:end].decode('ascii')
else:
raise Exception(f'unknow atyp: {atyp}') from None
port = int.from_bytes(data[end:end+2], 'big')
return (host, port), data[end+2:]
def unpack_addr(data, start=0):
atyp = data[start]
if atyp == 1: # IPV4
end = start + 5
ipv4 = data[start + 1 : end]
host = socket.inet_ntoa(ipv4)
elif atyp == 4: # IPV6
end = start + 17
ipv6 = data[start:end]
host = socket.inet_ntop(socket.AF_INET6, ipv6)
elif atyp == 3: # hostname
length = data[start + 1]
end = start + 2 + length
host = data[start + 2 : end].decode("ascii")
else:
raise Exception(f"unknow atyp: {atyp}")
port = int.from_bytes(data[end : end + 2], "big")
return (host, port), data[end + 2 :]
def get_origin_dst(ancdata):
for cmsg_level, cmsg_type, cmsg_data in ancdata:
if cmsg_level == socket.SOL_IP and cmsg_type == IP_RECVORIGDSTADDR:
family, port, ip = struct.unpack('!HH4s8x', cmsg_data)
return (socket.inet_ntoa(ip), port)
async def _run(self):
try:
buf = self.client._socket.getsockopt(socket.SOL_IP, SO_ORIGINAL_DST, 16)
port, host = struct.unpack("!2xH4s8x", buf)
self.target_addr = (socket.inet_ntoa(host), port)
except Exception:
gvars.logger.exception(f"{self} isn't a redirect proxy")
via_client = await self.connect_server(self.target_addr)
async with via_client:
await self.relay(via_client)
def get_origin_dst(ancdata):
for cmsg_level, cmsg_type, cmsg_data in ancdata:
if cmsg_level == socket.SOL_IP and cmsg_type == IP_RECVORIGDSTADDR:
family, port, ip = struct.unpack("!HH4s8x", cmsg_data)
return (socket.inet_ntoa(ip), port)
async def read_addr(self):
atyp = await self._stream.read_exactly(1)
if atyp == b"\x01": # IPV4
data = await self._stream.read_exactly(4)
host = socket.inet_ntoa(data)
elif atyp == b"\x04": # IPV6
data = await self._stream.read_exactly(16)
host = socket.inet_ntop(socket.AF_INET6, data)
elif atyp == b"\x03": # hostname
data = await self._stream.read_exactly(1)
data += await self._stream.read_exactly(data[0])
host = data[1:].decode("ascii")
else:
raise Exception(f"unknow atyp: {atyp}")
data_port = await self._stream.read_exactly(2)
port = int.from_bytes(data_port, "big")
return (host, port), atyp + data + data_port