Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_write_data():
"""Test send data over MultiplexerChannel."""
output = asyncio.Queue()
channel = MultiplexerChannel(output, IP_ADDR)
assert isinstance(channel.uuid, UUID)
await channel.write(b"test")
assert not output.empty()
message = output.get_nowait()
assert message.channel_id == channel.uuid
assert message.flow_type == CHANNEL_FLOW_DATA
assert message.data == b"test"
async def test_read_data():
"""Test send data over MultiplexerChannel."""
output = asyncio.Queue()
channel = MultiplexerChannel(output, IP_ADDR)
assert isinstance(channel.uuid, UUID)
message = MultiplexerMessage(channel.uuid, CHANNEL_FLOW_DATA, b"test")
channel.message_transport(message)
data = await channel.read()
assert data == b"test"
async def write(self, data: bytes) -> None:
"""Send data to peer."""
if not data:
raise MultiplexerTransportError()
if self._closing:
raise MultiplexerTransportClose()
# Create message
message = MultiplexerMessage(self._id, CHANNEL_FLOW_DATA, data)
try:
async with async_timeout.timeout(5):
await self._output.put(message)
except asyncio.TimeoutError:
_LOGGER.debug("Can't write to peer transport")
raise MultiplexerTransportError() from None
async def _process_message(self, message: MultiplexerMessage) -> None:
"""Process received message."""
# DATA
if message.flow_type == CHANNEL_FLOW_DATA:
# check if message exists
if message.channel_id not in self._channels:
_LOGGER.debug("Receive data from unknown channel")
return
channel = self._channels[message.channel_id]
if channel.closing:
pass
elif channel.healthy:
_LOGGER.warning("Abort connection, channel is not healthy")
channel.close()
self._loop.create_task(self.delete_channel(channel))
else:
channel.message_transport(message)
# New