Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# check if message exists
if message.channel_id not in self._channels:
_LOGGER.debug("Receive close from unknown channel")
return
channel = self._channels.pop(message.channel_id)
channel.close()
# Ping
elif message.flow_type == CHANNEL_FLOW_PING:
if message.extra.startswith(b"pong"):
_LOGGER.debug("Receive pong from peer / reset healthy")
self._healthy.set()
else:
_LOGGER.debug("Receive ping from peer / send pong")
self._write_message(
MultiplexerMessage(
message.channel_id, CHANNEL_FLOW_PING, b"", b"pong"
)
)
else:
_LOGGER.warning("Receive unknown message type")
header = self._crypto.decrypt(header)
channel_id = header[:16]
flow_type = header[16]
data_size = int.from_bytes(header[17:21], byteorder="big")
extra = header[21:]
except (IndexError, MultiplexerTransportDecrypt):
_LOGGER.warning("Wrong message header received")
return
# Read message data
if data_size:
data = await self._reader.readexactly(data_size)
else:
data = b""
message = MultiplexerMessage(
uuid.UUID(bytes=channel_id), flow_type, data, extra
)
# Process message to queue
await self._process_message(message)