Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def uri_compile(uri, is_server):
url = urllib.parse.urlparse(uri)
kw = {}
if url.scheme == "tunneludp":
if not url.fragment:
raise argparse.ArgumentTypeError(
"destitation must be assign in tunnel udp mode, "
"example tunneludp://:53#8.8.8.8:53"
)
host, _, port = url.fragment.partition(":")
kw["target_addr"] = (host, int(port))
if url.scheme in ("https",):
if not url.fragment:
raise argparse.ArgumentTypeError("#keyfile,certfile is needed")
keyfile, _, certfile = url.fragment.partition(",")
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_context.load_cert_chain(certfile=certfile, keyfile=keyfile)
kw["ssl_context"] = ssl_context
proto = server_protos[url.scheme] if is_server else client_protos[url.scheme]
cipher, _, loc = url.netloc.rpartition("@")
if cipher:
cipher_cls, _, password = cipher.partition(":")
if url.scheme.startswith("ss"):
kw["cipher_cls"] = ciphers[cipher_cls]
kw["password"] = password.encode()
elif url.scheme in ("http", "https", "socks"):
kw["auth"] = (cipher_cls.encode(), password.encode())
else:
pass
if loc:
kw["host"], _, port = loc.partition(":")
kw["port"] = int(port) if port else 1080
async def handle(client, addr):
print('Connection from', addr)
client.setsockopt(IPPROTO_TCP, TCP_NODELAY, 1)
async with client:
while True:
data = await client.recv(100000)
if not data:
break
await client.sendall(data)
print('Connection closed')
if __name__ == '__main__':
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_context.load_cert_chain(certfile=CERTFILE, keyfile=KEYFILE)
curio.run(network.tcp_server, '', 25000, handle, ssl=ssl_context)
if not line:
break
print(line)
await s.write(
b'''HTTP/1.0 200 OK\r
Content-type: text/plain\r
\r
If you're seeing this, it probably worked. Yay!
''')
await s.write(time.asctime().encode('ascii'))
await client.close()
if __name__ == '__main__':
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_context.load_cert_chain(certfile=CERTFILE, keyfile=KEYFILE)
print('Connect to https://localhost:10000 to see if it works')
try:
curio.run(curio.tcp_server('', 10000, handler, ssl=ssl_context))
except KeyboardInterrupt:
pass
# Certificate (self-signed)
CERTFILE = 'ssl_test.crt'
async def handle(client, addr):
print('Connection from', addr)
async with client:
while True:
data = await client.recv(1000)
if not data:
break
await client.send(data)
print('Connection closed')
if __name__ == '__main__':
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_context.load_cert_chain(certfile=CERTFILE, keyfile=KEYFILE)
try:
curio.run(network.tcp_server('', 10000, handle, ssl=ssl_context))
except KeyboardInterrupt:
pass
async def main(host, port):
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
sock = await network.open_connection(
host, port, ssl=True, server_hostname=None)
for i in range(1000):
msg = ('Message %d' % i).encode('ascii')
print(msg)
await sock.sendall(msg)
resp = await sock.recv(1000)
assert msg == resp
await sock.close()
def create_ssl_context(self):
"""
Create an SSL context that has reasonable settings and supports alpn for
h2 and http/1.1 in that order.
"""
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_context.options |= ssl.OP_NO_TLSv1
ssl_context.options |= ssl.OP_NO_TLSv1_1
ssl_context.options |= ssl.OP_NO_COMPRESSION
ssl_context.set_ciphers(self.tls_ciphers)
ssl_context.load_cert_chain(certfile=self.certfile, keyfile=self.keyfile)
ssl_context.set_alpn_protocols(["h2", "http/1.1"])
return ssl_context
def get_ssl(url):
ssl_context = None
if url.scheme in ("https",):
if not url.fragment:
raise argparse.ArgumentTypeError("#keyfile,certfile is needed")
keyfile, _, certfile = url.fragment.partition(",")
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_context.load_cert_chain(certfile=certfile, keyfile=keyfile)
return ssl_context
async def ssl_wrap_socket(
sock, ssl_context,
do_handshake_on_connect=True,
server_hostname=None,
alpn_protocols=None,
):
if not server_hostname:
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
if alpn_protocols:
ssl_context.set_alpn_protocols(alpn_protocols)
sock = await ssl_context.wrap_socket(
sock,
do_handshake_on_connect=do_handshake_on_connect,
server_hostname=server_hostname)
return sock
async def create_listening_ssl_socket(address, certfile, keyfile):
"""
Create and return a listening TLS socket on a given address.
"""
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_context.options |= (
ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_COMPRESSION
)
ssl_context.set_ciphers("ECDHE+AESGCM")
ssl_context.load_cert_chain(certfile=certfile, keyfile=keyfile)
ssl_context.set_alpn_protocols(["h2"])
sock = socket.socket()
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock = await ssl_context.wrap_socket(sock)
sock.bind(address)
sock.listen()
return sock