Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_setup_crypto_transport():
"""Test crypto transport setup."""
key = os.urandom(32)
iv = os.urandom(16)
crypto = CryptoTransport(key, iv)
for _ in range(1, 10):
test_data = os.urandom(32)
assert crypto.decrypt(crypto.encrypt(test_data)) == test_data
server = SniTunServerSingle(
FERNET_TOKENS, host="127.0.0.1", port="32000", throttling=500
)
await server.start()
reader_peer, writer_peer = await asyncio.open_connection(
host="127.0.0.1", port="32000"
)
valid = datetime.utcnow() + timedelta(days=1)
aes_key = os.urandom(32)
aes_iv = os.urandom(16)
hostname = "localhost"
fernet_token = create_peer_config(valid.timestamp(), hostname, aes_key, aes_iv)
crypto = CryptoTransport(aes_key, aes_iv)
writer_peer.write(fernet_token)
await writer_peer.drain()
token = await reader_peer.readexactly(32)
token = hashlib.sha256(crypto.decrypt(token)).digest()
writer_peer.write(crypto.encrypt(token))
await writer_peer.drain()
await asyncio.sleep(0.1)
assert server.peers.peer_available(hostname)
async def mock_new_channel(multiplexer, channel):
"""Mock new channel."""
while True:
async def test_peer_listener_invalid(peer_manager, peer_listener, test_client_peer):
"""Run a full flow of with a peer."""
valid = datetime.utcnow() - timedelta(days=1)
aes_key = os.urandom(32)
aes_iv = os.urandom(16)
hostname = "localhost"
fernet_token = create_peer_config(valid.timestamp(), hostname, aes_key, aes_iv)
crypto = CryptoTransport(aes_key, aes_iv)
test_client_peer.writer.write(fernet_token)
await test_client_peer.writer.drain()
with pytest.raises(asyncio.IncompleteReadError):
token = await test_client_peer.reader.readexactly(32)
async def test_server_full(
peer_manager, peer_listener, test_client_peer, sni_proxy, test_client_ssl
):
"""Run a full flow of with a peer after that disconnect."""
peer_messages = []
peer_address = []
valid = datetime.utcnow() + timedelta(days=1)
aes_key = os.urandom(32)
aes_iv = os.urandom(16)
hostname = "localhost"
fernet_token = create_peer_config(valid.timestamp(), hostname, aes_key, aes_iv)
crypto = CryptoTransport(aes_key, aes_iv)
test_client_peer.writer.write(fernet_token)
await test_client_peer.writer.drain()
token = await test_client_peer.reader.readexactly(32)
token = hashlib.sha256(crypto.decrypt(token)).digest()
test_client_peer.writer.write(crypto.encrypt(token))
await test_client_peer.writer.drain()
await asyncio.sleep(0.1)
assert peer_manager.peer_available(hostname)
async def mock_new_channel(multiplexer, channel):
"""Mock new channel."""
while True:
async def test_init_peer_wrong_challenge(loop, test_client, test_server):
"""Test setup multiplexer wrong challenge."""
client = test_server[0]
aes_key = os.urandom(32)
aes_iv = os.urandom(16)
valid = datetime.utcnow() + timedelta(days=1)
peer = Peer("localhost", valid, aes_key, aes_iv)
crypto = CryptoTransport(aes_key, aes_iv)
with pytest.raises(RuntimeError):
await peer.wait_disconnect()
init_task = loop.create_task(
peer.init_multiplexer_challenge(test_client.reader, test_client.writer)
)
await asyncio.sleep(0.1)
assert not init_task.done()
token = await client.reader.readexactly(32)
client.writer.write(crypto.encrypt(token))
await client.writer.drain()
await asyncio.sleep(0.1)
peer_address = []
server = SniTunServerSingle(FERNET_TOKENS, host="127.0.0.1", port="32000")
await server.start()
reader_peer, writer_peer = await asyncio.open_connection(
host="127.0.0.1", port="32000"
)
valid = datetime.utcnow() + timedelta(days=1)
aes_key = os.urandom(32)
aes_iv = os.urandom(16)
hostname = "localhost"
fernet_token = create_peer_config(valid.timestamp(), hostname, aes_key, aes_iv)
crypto = CryptoTransport(aes_key, aes_iv)
writer_peer.write(fernet_token)
await writer_peer.drain()
with pytest.raises(ConnectionResetError):
token = await reader_peer.readexactly(32)
token = hashlib.sha256(crypto.decrypt(token)).digest()
writer_peer.write(crypto.encrypt(token))
await writer_peer.drain()
await asyncio.sleep(0.1)
assert not server.peers.peer_available(hostname)
await server.stop()
async def test_peer_listener_disconnect(peer_manager, peer_listener, test_client_peer):
"""Run a full flow of with a peer after that disconnect."""
valid = datetime.utcnow() + timedelta(days=1)
aes_key = os.urandom(32)
aes_iv = os.urandom(16)
hostname = "localhost"
fernet_token = create_peer_config(valid.timestamp(), hostname, aes_key, aes_iv)
crypto = CryptoTransport(aes_key, aes_iv)
test_client_peer.writer.write(fernet_token)
await test_client_peer.writer.drain()
token = await test_client_peer.reader.readexactly(32)
token = hashlib.sha256(crypto.decrypt(token)).digest()
test_client_peer.writer.write(crypto.encrypt(token))
await test_client_peer.writer.drain()
await asyncio.sleep(0.1)
assert peer_manager.peer_available(hostname)
test_client_peer.writer.close()
await asyncio.sleep(0.1)
async def test_init_peer_multiplexer_crypto(loop, test_client, test_server):
"""Test setup multiplexer with crypto."""
client = test_server[0]
aes_key = os.urandom(32)
aes_iv = os.urandom(16)
valid = datetime.utcnow() + timedelta(days=1)
peer = Peer("localhost", valid, aes_key, aes_iv)
crypto = CryptoTransport(aes_key, aes_iv)
with pytest.raises(RuntimeError):
await peer.wait_disconnect()
init_task = loop.create_task(
peer.init_multiplexer_challenge(test_client.reader, test_client.writer)
)
await asyncio.sleep(0.1)
assert not init_task.done()
assert not peer.is_ready
assert not peer.is_connected
token = await client.reader.readexactly(32)
token = hashlib.sha256(crypto.decrypt(token)).digest()
client.writer.write(crypto.encrypt(token))
host=self._snitun_host, port=self._snitun_port
)
except OSError:
_LOGGER.error(
"Can't connect to SniTun server %s:%s",
self._snitun_host,
self._snitun_port,
)
raise SniTunConnectionError()
# Send fernet token
writer.write(fernet_token)
await writer.drain()
# Challenge/Response
crypto = CryptoTransport(aes_key, aes_iv)
try:
challenge = await reader.readexactly(32)
answer = hashlib.sha256(crypto.decrypt(challenge)).digest()
writer.write(crypto.encrypt(answer))
await writer.drain()
except (MultiplexerTransportDecrypt, asyncio.IncompleteReadError, OSError):
_LOGGER.error("Challenge/Response error with SniTun server")
raise SniTunConnectionError()
# Run multiplexer
self._multiplexer = Multiplexer(
crypto,
reader,
writer,
new_connections=connector.handler,
def __init__(
self,
hostname: str,
valid: datetime,
aes_key: bytes,
aes_iv: bytes,
throttling: Optional[int] = None,
):
"""Initialize a Peer."""
self._hostname = hostname
self._valid = valid
self._throttling = throttling
self._multiplexer = None
self._crypto = CryptoTransport(aes_key, aes_iv)