Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_init_peer_invalid():
"""Test simple init of peer with invalid date."""
valid = datetime.utcnow() - timedelta(days=1)
peer = Peer("localhost", valid, os.urandom(32), os.urandom(16))
assert not peer.is_valid
assert peer.hostname == "localhost"
assert peer.multiplexer is None
def test_init_peer():
"""Test simple init of peer."""
valid = datetime.utcnow() + timedelta(days=1)
peer = Peer("localhost", valid, os.urandom(32), os.urandom(16))
assert peer.is_valid
assert peer.hostname == "localhost"
assert peer.multiplexer is None
async def test_init_peer_multiplexer_crypto(loop, test_client, test_server):
"""Test setup multiplexer with crypto."""
client = test_server[0]
aes_key = os.urandom(32)
aes_iv = os.urandom(16)
valid = datetime.utcnow() + timedelta(days=1)
peer = Peer("localhost", valid, aes_key, aes_iv)
crypto = CryptoTransport(aes_key, aes_iv)
with pytest.raises(RuntimeError):
await peer.wait_disconnect()
init_task = loop.create_task(
peer.init_multiplexer_challenge(test_client.reader, test_client.writer)
)
await asyncio.sleep(0.1)
assert not init_task.done()
assert not peer.is_ready
assert not peer.is_connected
token = await client.reader.readexactly(32)
token = hashlib.sha256(crypto.decrypt(token)).digest()
async def test_init_peer_wrong_challenge(loop, test_client, test_server):
"""Test setup multiplexer wrong challenge."""
client = test_server[0]
aes_key = os.urandom(32)
aes_iv = os.urandom(16)
valid = datetime.utcnow() + timedelta(days=1)
peer = Peer("localhost", valid, aes_key, aes_iv)
crypto = CryptoTransport(aes_key, aes_iv)
with pytest.raises(RuntimeError):
await peer.wait_disconnect()
init_task = loop.create_task(
peer.init_multiplexer_challenge(test_client.reader, test_client.writer)
)
await asyncio.sleep(0.1)
assert not init_task.done()
token = await client.reader.readexactly(32)
client.writer.write(crypto.encrypt(token))
await client.writer.drain()
await asyncio.sleep(0.1)
async def test_init_peer_multiplexer_throttling(loop, test_client, test_server):
"""Test setup multiplexer."""
client = test_server[0]
aes_key = os.urandom(32)
aes_iv = os.urandom(16)
valid = datetime.utcnow() + timedelta(days=1)
peer = Peer("localhost", valid, aes_key, aes_iv, throttling=500)
crypto = CryptoTransport(aes_key, aes_iv)
with pytest.raises(RuntimeError):
await peer.wait_disconnect()
init_task = loop.create_task(
peer.init_multiplexer_challenge(test_client.reader, test_client.writer)
)
await asyncio.sleep(0.1)
assert not init_task.done()
assert not peer.is_ready
assert not peer.is_connected
token = await client.reader.readexactly(32)
token = hashlib.sha256(crypto.decrypt(token)).digest()
except (InvalidToken, json.JSONDecodeError):
_LOGGER.warning("Invalid fernet token")
raise SniTunInvalidPeer()
# Check if token is valid
valid = datetime.utcfromtimestamp(config["valid"])
if valid < datetime.utcnow():
_LOGGER.warning("Token was expired")
raise SniTunInvalidPeer()
# Extract configuration
hostname = config["hostname"]
aes_key = bytes.fromhex(config["aes_key"])
aes_iv = bytes.fromhex(config["aes_iv"])
return Peer(hostname, valid, aes_key, aes_iv, throttling=self._throttling)