Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_write_data_peer_error(raise_timeout):
"""Test send data over MultiplexerChannel but peer don't response."""
output = asyncio.Queue(1)
channel = MultiplexerChannel(output, IP_ADDR)
assert isinstance(channel.uuid, UUID)
# fill peer queue
output.put_nowait(None)
with pytest.raises(MultiplexerTransportError):
await channel.write(b"test")
multi_core.PEER_TCP_TIMEOUT = 0.2
client = test_server[0]
ping_task = loop.create_task(multiplexer_client.ping())
await asyncio.sleep(0.3)
data = await client.reader.read(60)
data = multiplexer_client._crypto.decrypt(data)
assert data[16] == CHANNEL_FLOW_PING
assert int.from_bytes(data[17:21], "big") == 0
assert data[21:25] == b"ping"
assert ping_task.done()
with pytest.raises(MultiplexerTransportError):
raise ping_task.exception()
multi_core.PEER_TCP_TIMEOUT = 90
async def test_write_data_empty():
"""Test send data over MultiplexerChannel."""
output = asyncio.Queue()
channel = MultiplexerChannel(output, IP_ADDR)
assert isinstance(channel.uuid, UUID)
with pytest.raises(MultiplexerTransportError):
await channel.write(b"")
async def test_multiplexer_close_channel_full(multiplexer_client):
"""Test that channels are nice removed but peer error is available."""
assert not multiplexer_client._channels
channel = await multiplexer_client.create_channel(IP_ADDR)
await asyncio.sleep(0.1)
assert multiplexer_client._channels
with patch("async_timeout.timeout", side_effect=asyncio.TimeoutError()):
with pytest.raises(MultiplexerTransportError):
channel = await multiplexer_client.delete_channel(channel)
await asyncio.sleep(0.1)
assert not multiplexer_client._channels
async def _handler(self) -> None:
"""Wait until connection is closed."""
try:
while self._multiplexer.is_connected:
try:
async with async_timeout.timeout(50):
await self._multiplexer.wait()
except asyncio.TimeoutError:
await self._multiplexer.ping()
except MultiplexerTransportError:
pass
finally:
self._multiplexer = None
async def _proxy_peer(
self,
multiplexer: Multiplexer,
client_hello: bytes,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
):
"""Proxy data between end points."""
transport = writer.transport
ip_address = ipaddress.ip_address(writer.get_extra_info("peername")[0])
# Open multiplexer channel
try:
channel = await multiplexer.create_channel(ip_address)
except MultiplexerTransportError:
_LOGGER.error("New transport channel to peer fails")
return
from_proxy = None
from_peer = None
try:
await channel.write(client_hello)
# Process stream into multiplexer
while not transport.is_closing():
if not from_proxy:
from_proxy = self._loop.create_task(reader.read(4096))
if not from_peer:
from_peer = self._loop.create_task(channel.read())
# Wait until data need to be processed
from_endpoint = None
# From peer
if from_peer.done():
if from_peer.exception():
raise from_peer.exception()
writer.write(from_peer.result())
from_peer = None
# Flush buffer
await writer.drain()
except (MultiplexerTransportError, OSError, RuntimeError):
_LOGGER.debug("Transport closed by endpoint for %s", channel.uuid)
with suppress(MultiplexerTransportError):
await multiplexer.delete_channel(channel)
except MultiplexerTransportClose:
_LOGGER.debug("Peer close connection for %s", channel.uuid)
finally:
# Cleanup peer reader
if from_peer:
if not from_peer.done():
from_peer.cancel()
else:
# Avoid exception was never retrieved
from_peer.exception()
# Cleanup endpoint reader
if from_endpoint and not from_endpoint.done():
await channel.write(from_endpoint.result())
from_endpoint = None
# From peer
if from_peer.done():
if from_peer.exception():
raise from_peer.exception()
writer.write(from_peer.result())
from_peer = None
# Flush buffer
await writer.drain()
except (MultiplexerTransportError, OSError, RuntimeError):
_LOGGER.debug("Transport closed by endpoint for %s", channel.uuid)
with suppress(MultiplexerTransportError):
await multiplexer.delete_channel(channel)
except MultiplexerTransportClose:
_LOGGER.debug("Peer close connection for %s", channel.uuid)
finally:
# Cleanup peer reader
if from_peer:
if not from_peer.done():
from_peer.cancel()
else:
# Avoid exception was never retrieved
from_peer.exception()
from_proxy = None
# From peer
if from_peer.done():
if from_peer.exception():
raise from_peer.exception()
writer.write(from_peer.result())
from_peer = None
# Flush buffer
await writer.drain()
except (MultiplexerTransportError, OSError, RuntimeError):
_LOGGER.debug("Transport closed by Proxy for %s", channel.uuid)
with suppress(MultiplexerTransportError):
await multiplexer.delete_channel(channel)
except asyncio.TimeoutError:
_LOGGER.debug("Close TCP session after timeout for %s", channel.uuid)
with suppress(MultiplexerTransportError):
await multiplexer.delete_channel(channel)
except MultiplexerTransportClose:
_LOGGER.debug("Peer close connection for %s", channel.uuid)
finally:
# Cleanup peer reader
if from_peer:
if not from_peer.done():
from_peer.cancel()
else: