Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_close_channel_msg():
"""Test close MultiplexerChannel."""
output = asyncio.Queue()
channel = MultiplexerChannel(output, IP_ADDR)
assert isinstance(channel.uuid, UUID)
message = channel.init_close()
assert message.channel_id == channel.uuid
assert message.flow_type == CHANNEL_FLOW_CLOSE
assert message.data == b""
def init_close(self) -> MultiplexerMessage:
"""Init close message for transport."""
_LOGGER.debug("Close channel %s", self._id)
return MultiplexerMessage(self._id, CHANNEL_FLOW_CLOSE)
if not self._new_connections:
_LOGGER.warning("Request new Channel is not allow")
return
ip_address = bytes_to_ip_address(message.extra[1:5])
channel = MultiplexerChannel(
self._queue,
ip_address,
channel_id=message.channel_id,
throttling=self._throttling,
)
self._channels[channel.uuid] = channel
self._loop.create_task(self._new_connections(self, channel))
# Close
elif message.flow_type == CHANNEL_FLOW_CLOSE:
# check if message exists
if message.channel_id not in self._channels:
_LOGGER.debug("Receive close from unknown channel")
return
channel = self._channels.pop(message.channel_id)
channel.close()
# Ping
elif message.flow_type == CHANNEL_FLOW_PING:
if message.extra.startswith(b"pong"):
_LOGGER.debug("Receive pong from peer / reset healthy")
self._healthy.set()
else:
_LOGGER.debug("Receive ping from peer / send pong")
self._write_message(
MultiplexerMessage(