Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_write_data():
"""Test send data over MultiplexerChannel."""
output = asyncio.Queue()
channel = MultiplexerChannel(output, IP_ADDR)
assert isinstance(channel.uuid, UUID)
await channel.write(b"test")
assert not output.empty()
message = output.get_nowait()
assert message.channel_id == channel.uuid
assert message.flow_type == CHANNEL_FLOW_DATA
assert message.data == b"test"
async def test_message_transport_never_lock():
"""Message transport should never lock down."""
output = asyncio.Queue(1)
channel = MultiplexerChannel(output, IP_ADDR)
assert isinstance(channel.uuid, UUID)
for _ in range(1, 10000):
channel.message_transport(channel.init_close())
assert channel.error
async def test_write_data_after_close():
"""Test send data over MultiplexerChannel."""
output = asyncio.Queue()
channel = MultiplexerChannel(output, IP_ADDR)
assert isinstance(channel.uuid, UUID)
assert not channel.closing
channel.close()
with pytest.raises(MultiplexerTransportClose):
await channel.write(b"test")
assert channel.closing
async def test_initial_channel_msg():
"""Test new MultiplexerChannel with UUID."""
output = asyncio.Queue()
channel = MultiplexerChannel(output, IP_ADDR)
assert isinstance(channel.uuid, UUID)
message = channel.init_new()
assert message.channel_id == channel.uuid
assert message.flow_type == CHANNEL_FLOW_NEW
assert message.data == b""
assert message.extra == b"4" + ip_address_to_bytes(IP_ADDR)
async def test_close_channel_msg():
"""Test close MultiplexerChannel."""
output = asyncio.Queue()
channel = MultiplexerChannel(output, IP_ADDR)
assert isinstance(channel.uuid, UUID)
message = channel.init_close()
assert message.channel_id == channel.uuid
assert message.flow_type == CHANNEL_FLOW_CLOSE
assert message.data == b""
async def test_read_data():
"""Test send data over MultiplexerChannel."""
output = asyncio.Queue()
channel = MultiplexerChannel(output, IP_ADDR)
assert isinstance(channel.uuid, UUID)
message = MultiplexerMessage(channel.uuid, CHANNEL_FLOW_DATA, b"test")
channel.message_transport(message)
data = await channel.read()
assert data == b"test"
async def test_read_data_on_close():
"""Test send data over MultiplexerChannel on close."""
output = asyncio.Queue()
channel = MultiplexerChannel(output, IP_ADDR)
assert isinstance(channel.uuid, UUID)
assert not channel.closing
channel.close()
with pytest.raises(MultiplexerTransportClose):
data = await channel.read()
assert channel.closing
async def test_closing():
"""Test send data over MultiplexerChannel."""
output = asyncio.Queue()
channel = MultiplexerChannel(output, IP_ADDR)
assert isinstance(channel.uuid, UUID)
assert not channel.closing
channel.close()
assert channel.closing
async def create_channel(
self, ip_address: ipaddress.IPv4Address
) -> MultiplexerChannel:
"""Create a new channel for transport."""
channel = MultiplexerChannel(
self._queue, ip_address, throttling=self._throttling
)
message = channel.init_new()
try:
async with async_timeout.timeout(5):
await self._queue.put(message)
except asyncio.TimeoutError:
raise MultiplexerTransportError() from None
else:
self._channels[channel.uuid] = channel
return channel
elif channel.healthy:
_LOGGER.warning("Abort connection, channel is not healthy")
channel.close()
self._loop.create_task(self.delete_channel(channel))
else:
channel.message_transport(message)
# New
elif message.flow_type == CHANNEL_FLOW_NEW:
# Check if we would handle new connection
if not self._new_connections:
_LOGGER.warning("Request new Channel is not allow")
return
ip_address = bytes_to_ip_address(message.extra[1:5])
channel = MultiplexerChannel(
self._queue,
ip_address,
channel_id=message.channel_id,
throttling=self._throttling,
)
self._channels[channel.uuid] = channel
self._loop.create_task(self._new_connections(self, channel))
# Close
elif message.flow_type == CHANNEL_FLOW_CLOSE:
# check if message exists
if message.channel_id not in self._channels:
_LOGGER.debug("Receive close from unknown channel")
return
channel = self._channels.pop(message.channel_id)
channel.close()