Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
await writer_peer.drain()
await asyncio.sleep(0.1)
assert server.peers.peer_available(hostname)
async def mock_new_channel(multiplexer, channel):
"""Mock new channel."""
while True:
message = await channel.read()
peer_messages.append(message)
peer_address.append(channel.ip_address)
_, writer_ssl = await asyncio.open_connection(host="127.0.0.1", port="32000")
multiplexer = Multiplexer(crypto, reader_peer, writer_peer, mock_new_channel)
writer_ssl.write(TLS_1_2)
await writer_ssl.drain()
await asyncio.sleep(0.1)
assert peer_messages
assert peer_messages[0] == TLS_1_2
assert peer_address
assert peer_address[0] == IP_ADDR
peer = server.peers.get_peer(hostname)
assert peer._multiplexer._throttling == 0.002
multiplexer.shutdown()
await multiplexer.wait()
await asyncio.sleep(0.1)
token = hashlib.sha256(crypto.decrypt(token)).digest()
test_client_peer.writer.write(crypto.encrypt(token))
await test_client_peer.writer.drain()
await asyncio.sleep(0.1)
assert peer_manager.peer_available(hostname)
async def mock_new_channel(multiplexer, channel):
"""Mock new channel."""
while True:
message = await channel.read()
peer_messages.append(message)
peer_address.append(channel.ip_address)
multiplexer = Multiplexer(
crypto, test_client_peer.reader, test_client_peer.writer, mock_new_channel
)
test_client_ssl.writer.write(TLS_1_2)
await test_client_ssl.writer.drain()
await asyncio.sleep(0.1)
assert peer_messages
assert peer_messages[0] == TLS_1_2
assert peer_address
assert peer_address[0] == IP_ADDR
multiplexer.shutdown()
await multiplexer.wait()
await asyncio.sleep(0.1)
async def test_init_multiplexer_client_throttling(test_client, crypto_transport):
"""Test to create a new Multiplexer from client socket."""
multiplexer = Multiplexer(
crypto_transport, test_client.reader, test_client.writer, throttling=500
)
assert multiplexer.is_connected
assert multiplexer._throttling == 0.002
multiplexer.shutdown()
async def test_init_multiplexer_server(test_server, test_client, crypto_transport):
"""Test to create a new Multiplexer from server socket."""
client = test_server[0]
multiplexer = Multiplexer(crypto_transport, client.reader, client.writer)
assert multiplexer.is_connected
assert multiplexer._throttling is None
multiplexer.shutdown()
client.close.set()
await writer.drain()
# Challenge/Response
crypto = CryptoTransport(aes_key, aes_iv)
try:
challenge = await reader.readexactly(32)
answer = hashlib.sha256(crypto.decrypt(challenge)).digest()
writer.write(crypto.encrypt(answer))
await writer.drain()
except (MultiplexerTransportDecrypt, asyncio.IncompleteReadError, OSError):
_LOGGER.error("Challenge/Response error with SniTun server")
raise SniTunConnectionError()
# Run multiplexer
self._multiplexer = Multiplexer(
crypto,
reader,
writer,
new_connections=connector.handler,
throttling=throttling,
)
# Task a process for pings/cleanups
self._loop.create_task(self._handler())
data = self._crypto.decrypt(data)
# Check Token
assert hashlib.sha256(token).digest() == data
except (
asyncio.IncompleteReadError,
MultiplexerTransportDecrypt,
AssertionError,
OSError,
):
_LOGGER.warning("Wrong challenge from peer")
raise SniTunChallengeError()
# Start Multiplexer
self._multiplexer = Multiplexer(
self._crypto, reader, writer, throttling=self._throttling
)