Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_read_data_on_close():
"""Test send data over MultiplexerChannel on close."""
output = asyncio.Queue()
channel = MultiplexerChannel(output, IP_ADDR)
assert isinstance(channel.uuid, UUID)
assert not channel.closing
channel.close()
with pytest.raises(MultiplexerTransportClose):
data = await channel.read()
assert channel.closing
):
"""Test and init a connector with whitelist bad requests."""
assert not test_endpoint
connector = Connector("127.0.0.1", "8822", True)
multiplexer_client._new_connections = connector.handler
connector.whitelist.add(IP_ADDR)
assert IP_ADDR in connector.whitelist
assert BAD_ADDR not in connector.whitelist
channel = await multiplexer_server.create_channel(BAD_ADDR)
await asyncio.sleep(0.1)
assert not test_endpoint
with pytest.raises(MultiplexerTransportClose):
await channel.read()
async def test_write_data_after_close():
"""Test send data over MultiplexerChannel."""
output = asyncio.Queue()
channel = MultiplexerChannel(output, IP_ADDR)
assert isinstance(channel.uuid, UUID)
assert not channel.closing
channel.close()
with pytest.raises(MultiplexerTransportClose):
await channel.write(b"test")
assert channel.closing
async def test_multiplexer_data_channel_abort_full(
multiplexer_client, multiplexer_server
):
"""Test that new channels are created."""
assert not multiplexer_client._channels
assert not multiplexer_server._channels
channel_client = await multiplexer_client.create_channel(IP_ADDR)
await asyncio.sleep(0.1)
channel_server = multiplexer_server._channels.get(channel_client.uuid)
assert channel_client
assert channel_server
with pytest.raises(MultiplexerTransportClose):
for count in range(1, 50000):
await channel_client.write(b"test xxxx")
with pytest.raises(MultiplexerTransportClose):
for count in range(1, 50000):
data = await channel_server.read()
await asyncio.sleep(0.1)
assert not multiplexer_client._channels
assert not multiplexer_server._channels
assert not client_read.done()
assert not server_read.done()
multiplexer_client.shutdown()
await asyncio.sleep(0.1)
assert not multiplexer_client._channels
assert client_read.done()
with pytest.raises(MultiplexerTransportClose):
raise client_read.exception()
assert not multiplexer_server._channels
assert server_read.done()
with pytest.raises(MultiplexerTransportClose):
raise server_read.exception()
await channel.write(b"Hallo")
data = await test_connection.reader.read(1024)
assert data == b"Hallo"
test_connection.writer.write(b"Hiro")
await test_connection.writer.drain()
data = await channel.read()
assert data == b"Hiro"
test_connection.writer.close()
test_connection.close.set()
await asyncio.sleep(0.1)
with pytest.raises(MultiplexerTransportClose):
await channel.read()
from_peer = None
# Flush buffer
await writer.drain()
except (MultiplexerTransportError, OSError, RuntimeError):
_LOGGER.debug("Transport closed by Proxy for %s", channel.uuid)
with suppress(MultiplexerTransportError):
await multiplexer.delete_channel(channel)
except asyncio.TimeoutError:
_LOGGER.debug("Close TCP session after timeout for %s", channel.uuid)
with suppress(MultiplexerTransportError):
await multiplexer.delete_channel(channel)
except MultiplexerTransportClose:
_LOGGER.debug("Peer close connection for %s", channel.uuid)
finally:
# Cleanup peer reader
if from_peer:
if not from_peer.done():
from_peer.cancel()
else:
# Avoid exception was never retrieved
from_peer.exception()
# Cleanup proxy reader
if from_proxy and not from_proxy.done():
from_proxy.cancel()
# Flush buffer
await self._writer.drain()
# throttling
if not self._throttling:
continue
await asyncio.sleep(self._throttling)
except (asyncio.CancelledError, asyncio.TimeoutError):
_LOGGER.debug("Receive canceling")
with suppress(OSError):
self._writer.write_eof()
await self._writer.drain()
except (MultiplexerTransportClose, asyncio.IncompleteReadError, OSError):
_LOGGER.debug("Transport was closed")
finally:
# Cleanup peer writer
if to_peer and not to_peer.done():
to_peer.cancel()
# Cleanup peer reader
if from_peer:
if not from_peer.done():
from_peer.cancel()
else:
# Avoid exception was never retrieved
from_peer.exception()
# Cleanup transport
if from_peer.done():
if from_peer.exception():
raise from_peer.exception()
writer.write(from_peer.result())
from_peer = None
# Flush buffer
await writer.drain()
except (MultiplexerTransportError, OSError, RuntimeError):
_LOGGER.debug("Transport closed by endpoint for %s", channel.uuid)
with suppress(MultiplexerTransportError):
await multiplexer.delete_channel(channel)
except MultiplexerTransportClose:
_LOGGER.debug("Peer close connection for %s", channel.uuid)
finally:
# Cleanup peer reader
if from_peer:
if not from_peer.done():
from_peer.cancel()
else:
# Avoid exception was never retrieved
from_peer.exception()
# Cleanup endpoint reader
if from_endpoint and not from_endpoint.done():
from_endpoint.cancel()
# Close Transport
def _write_message(self, message: MultiplexerMessage) -> None:
"""Write message to peer."""
header = message.channel_id.bytes
header += message.flow_type.to_bytes(1, byteorder="big")
header += len(message.data).to_bytes(4, byteorder="big")
header += message.extra + os.urandom(11 - len(message.extra))
data = self._crypto.encrypt(header) + message.data
try:
self._writer.write(data)
except RuntimeError:
raise MultiplexerTransportClose() from None