Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_initial_channel_msg():
"""Test new MultiplexerChannel with UUID."""
output = asyncio.Queue()
channel = MultiplexerChannel(output, IP_ADDR)
assert isinstance(channel.uuid, UUID)
message = channel.init_new()
assert message.channel_id == channel.uuid
assert message.flow_type == CHANNEL_FLOW_NEW
assert message.data == b""
assert message.extra == b"4" + ip_address_to_bytes(IP_ADDR)
def init_new(self) -> MultiplexerMessage:
"""Init new session for transport."""
_LOGGER.debug("New channel %s", self._id)
extra = b"4" + ip_address_to_bytes(self.ip_address)
return MultiplexerMessage(self._id, CHANNEL_FLOW_NEW, b"", extra)
if message.channel_id not in self._channels:
_LOGGER.debug("Receive data from unknown channel")
return
channel = self._channels[message.channel_id]
if channel.closing:
pass
elif channel.healthy:
_LOGGER.warning("Abort connection, channel is not healthy")
channel.close()
self._loop.create_task(self.delete_channel(channel))
else:
channel.message_transport(message)
# New
elif message.flow_type == CHANNEL_FLOW_NEW:
# Check if we would handle new connection
if not self._new_connections:
_LOGGER.warning("Request new Channel is not allow")
return
ip_address = bytes_to_ip_address(message.extra[1:5])
channel = MultiplexerChannel(
self._queue,
ip_address,
channel_id=message.channel_id,
throttling=self._throttling,
)
self._channels[channel.uuid] = channel
self._loop.create_task(self._new_connections(self, channel))
# Close