Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
Allows connections to be made that may or may not require ssl.
Somewhat surprisingly trio doesn't have an abstraction for this like
curio even though it's fairly trivial to write. Down the line hopefully.
Args:
host (str): Network location, either by domain or IP.
port (int): The requested port.
ssl (bool or SSLContext): If False or None, SSL is not required. If
True, the context returned by trio.ssl.create_default_context will
be used. Otherwise, this may be an SSLContext object.
kwargs: A catch all to soak up curio's additional kwargs and
ignore them.
'''
import trio
if not ssl:
sock = await trio.open_tcp_stream(host, port)
else:
if isinstance(ssl, bool):
ssl_context = None
else:
ssl_context = ssl
sock = await trio.open_ssl_over_tcp_stream(host, port, ssl_context=ssl_context)
await sock.do_handshake()
sock.close = sock.aclose
return sock
ssl_context = ssl.create_default_context()
else:
ssl_context = self._ssl
port = self._port
if port is None:
if self._ssl:
port = 5671
else:
port = 5672
if self._ssl:
stream = await trio.open_ssl_over_tcp_stream(self._host, port, ssl_context=ssl_context)
sock = stream.transport_stream
else:
sock = stream = await trio.open_tcp_stream(self._host, port)
sock.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
stream = trio.StapledStream(
send_stream=stream,
receive_stream=BufferedReceiveStream(stream, READ_BUF_SIZE),
)
self._stream = stream
self._sock = sock
# the writer loop needs to run since the beginning
await self._nursery.start(self._writer_loop)
try:
await self._stream.send_all(amqp_constants.PROTOCOL_HEADER)
async def connect(
self, destaddr: Tuple[Any, ...] = None,
**kwargs
) -> trio.SocketStream:
if self.connected():
raise RuntimeError("channel is already connected?")
destaddr = destaddr or self._destaddr
assert isinstance(destaddr, tuple)
stream = await trio.open_tcp_stream(*destaddr, **kwargs)
self.msgstream = MsgpackStream(stream)
return stream
async def connect(
self, host, port, connect_timeout, source_address=None, socket_options=None
):
if source_address is not None:
# You can't really combine source_address= and happy eyeballs
# (can we get rid of source_address? or at least make it a source
# ip, no port?)
raise NotImplementedError(
"trio backend doesn't support setting source_address"
)
stream = await trio.open_tcp_stream(host, port)
if socket_options:
for (level, optname, value) in socket_options:
stream.setsockopt(level, optname, value)
return TrioSocket(stream)
async def connect(
self, host, port, source_address=None, socket_options=None
):
if source_address is not None:
# You can't really combine source_address= and happy eyeballs
# (can we get rid of source_address? or at least make it a source
# ip, no port?)
raise NotImplementedError(
"trio backend doesn't support setting source_address"
)
stream = await trio.open_tcp_stream(host, port)
for (level, optname, value) in socket_options:
stream.setsockopt(level, optname, value)
return TrioSocket(stream)
async def trio_open_connection(host, port, *, ssl=False, **kwargs):
'''
Allows connections to be made that may or may not require ssl.
Somewhat surprisingly trio doesn't have an abstraction for this like
curio even though it's fairly trivial to write. Down the line hopefully.
Args:
host (str): Network location, either by domain or IP.
port (int): The requested port.
ssl (bool): Weather or not SSL is required.
kwargs: A catch all to soak up curio's additional kwargs and
ignore them.
'''
import trio
if not ssl:
sock = await trio.open_tcp_stream(host, port)
else:
sock = await trio.open_ssl_over_tcp_stream(host, port)
await sock.do_handshake()
return sock