Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def compile(cls, uri, relay=None):
scheme, _, uri = uri.partition('://')
url = urllib.parse.urlparse('s://'+uri)
rawprotos = scheme.split('+')
err_str, protos = proto.get_protos(rawprotos)
if err_str:
raise argparse.ArgumentTypeError(err_str)
if 'ssl' in rawprotos or 'secure' in rawprotos:
import ssl
sslserver = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
sslclient = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
if 'ssl' in rawprotos:
sslclient.check_hostname = False
sslclient.verify_mode = ssl.CERT_NONE
else:
sslserver = sslclient = None
protonames = [i.name for i in protos]
if 'pack' in protonames and relay and relay != cls.DIRECT:
raise argparse.ArgumentTypeError('pack protocol cannot relay to other proxy')
urlpath, _, plugins = url.path.partition(',')
urlpath, _, lbind = urlpath.partition('@')
async def datagram_handler(writer, data, addr, protos, urserver, block, cipher, salgorithm, verbose=DUMMY, **kwargs):
try:
remote_ip, remote_port, *_ = addr
remote_text = f'{remote_ip}:{remote_port}'
data = cipher.datagram.decrypt(data) if cipher else data
lproto, host_name, port, data = proto.udp_parse(protos, data, sock=writer.get_extra_info('socket'), **kwargs)
if host_name == 'echo':
writer.sendto(data, addr)
elif host_name == 'empty':
pass
elif block and block(host_name):
raise Exception('BLOCK ' + host_name)
else:
roption = schedule(urserver, salgorithm, host_name) or ProxyURI.DIRECT
verbose(f'UDP {lproto.name} {remote_text}{roption.logtext(host_name, port)}')
data = roption.prepare_udp_connection(host_name, port, data)
def reply(rdata):
rdata = lproto.udp_client2(host_name, port, rdata)
writer.sendto(cipher.datagram.encrypt(rdata) if cipher else rdata, addr)
await roption.open_udp_connection(host_name, port, data, addr, reply)
except Exception as ex:
if not str(ex).startswith('Connection closed'):
async def stream_handler(reader, writer, unix, lbind, protos, rserver, cipher, authtime=86400*30, block=None, salgorithm='fa', verbose=DUMMY, modstat=lambda r,h:lambda i:DUMMY, **kwargs):
try:
if unix:
remote_ip, server_ip, remote_text = 'local', None, 'unix_local'
else:
remote_ip, remote_port, *_ = writer.get_extra_info('peername')
server_ip = writer.get_extra_info('sockname')[0]
remote_text = f'{remote_ip}:{remote_port}'
local_addr = None if server_ip in ('127.0.0.1', '::1', None) else (server_ip, 0)
reader_cipher, _ = await prepare_ciphers(cipher, reader, writer, server_side=False)
lproto, host_name, port, lbuf, rbuf = await proto.parse(protos, reader=reader, writer=writer, authtable=AuthTable(remote_ip, authtime), reader_cipher=reader_cipher, sock=writer.get_extra_info('socket'), **kwargs)
if host_name == 'echo':
asyncio.ensure_future(lproto.channel(reader, writer, DUMMY, DUMMY))
elif host_name == 'empty':
asyncio.ensure_future(lproto.channel(reader, writer, None, DUMMY))
elif block and block(host_name):
raise Exception('BLOCK ' + host_name)
else:
roption = schedule(rserver, salgorithm, host_name) or ProxyURI.DIRECT
verbose(f'{lproto.name} {remote_text}{roption.logtext(host_name, port)}')
try:
reader_remote, writer_remote = await roption.open_connection(host_name, port, local_addr, lbind)
except asyncio.TimeoutError:
raise Exception(f'Connection timeout {roption.bind}')
try:
reader_remote, writer_remote = await roption.prepare_connection(reader_remote, writer_remote, host_name, port)
writer.write(lbuf)