Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
)
raise SniTunConnectionError()
# Send fernet token
writer.write(fernet_token)
await writer.drain()
# Challenge/Response
crypto = CryptoTransport(aes_key, aes_iv)
try:
challenge = await reader.readexactly(32)
answer = hashlib.sha256(crypto.decrypt(challenge)).digest()
writer.write(crypto.encrypt(answer))
await writer.drain()
except (MultiplexerTransportDecrypt, asyncio.IncompleteReadError, OSError):
_LOGGER.error("Challenge/Response error with SniTun server")
raise SniTunConnectionError()
# Run multiplexer
self._multiplexer = Multiplexer(
crypto,
reader,
writer,
new_connections=connector.handler,
throttling=throttling,
)
# Task a process for pings/cleanups
self._loop.create_task(self._handler())
async def _read_message(self, header: bytes) -> None:
"""Read message from peer."""
if not header:
raise MultiplexerTransportClose()
try:
header = self._crypto.decrypt(header)
channel_id = header[:16]
flow_type = header[16]
data_size = int.from_bytes(header[17:21], byteorder="big")
extra = header[21:]
except (IndexError, MultiplexerTransportDecrypt):
_LOGGER.warning("Wrong message header received")
return
# Read message data
if data_size:
data = await self._reader.readexactly(data_size)
else:
data = b""
message = MultiplexerMessage(
uuid.UUID(bytes=channel_id), flow_type, data, extra
)
# Process message to queue
await self._process_message(message)
) -> None:
"""Initialize multiplexer."""
try:
token = hashlib.sha256(os.urandom(40)).digest()
writer.write(self._crypto.encrypt(token))
await writer.drain()
data = await reader.readexactly(32)
data = self._crypto.decrypt(data)
# Check Token
assert hashlib.sha256(token).digest() == data
except (
asyncio.IncompleteReadError,
MultiplexerTransportDecrypt,
AssertionError,
OSError,
):
_LOGGER.warning("Wrong challenge from peer")
raise SniTunChallengeError()
# Start Multiplexer
self._multiplexer = Multiplexer(
self._crypto, reader, writer, throttling=self._throttling
)
def decrypt(self, data: bytes) -> bytes:
"""Decrypt data from transport."""
try:
return self._decryptor.update(data)
except InvalidTag:
raise MultiplexerTransportDecrypt() from None