Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_init_client_peer_invalid_token(
peer_listener, peer_manager, test_endpoint
):
"""Test setup of ClientPeer."""
client = ClientPeer("127.0.0.1", "8893")
connector = Connector("127.0.0.1", "8822")
assert not peer_manager.peer_available("localhost")
valid = datetime.utcnow() + timedelta(days=-1)
aes_key = os.urandom(32)
aes_iv = os.urandom(16)
hostname = "localhost"
fernet_token = create_peer_config(valid.timestamp(), hostname, aes_key, aes_iv)
with pytest.raises(SniTunConnectionError):
await client.start(connector, fernet_token, aes_key, aes_iv)
await asyncio.sleep(0.1)
assert not peer_manager.peer_available("localhost")
async def test_fernet_token_date(peer_listener, peer_manager, test_endpoint):
"""Test fernet token created by server as invalid."""
client = ClientPeer("127.0.0.1", "8893")
connector = Connector("127.0.0.1", "8822")
assert not peer_manager.peer_available("localhost")
valid = timedelta(days=-1)
aes_key = os.urandom(32)
aes_iv = os.urandom(16)
hostname = "localhost"
fernet_token = server.generate_client_token(
FERNET_TOKENS, valid, hostname, aes_key, aes_iv
)
with pytest.raises(SniTunConnectionError):
await client.start(connector, fernet_token, aes_key, aes_iv)
await asyncio.sleep(0.1)
assert not peer_manager.peer_available("localhost")
# Check if we already connected
if self._snitun.is_connected:
return
try:
await self._refresh_snitun_token()
await self._snitun.connect(
self._token.fernet,
self._token.aes_key,
self._token.aes_iv,
throttling=self._token.throttling,
)
self.cloud.client.dispatcher_message(const.DISPATCH_REMOTE_CONNECT)
except SniTunConnectionError:
_LOGGER.error("Connection problem to snitun server")
except RemoteBackendError:
_LOGGER.error("Can't refresh the snitun token")
except SubscriptionExpired:
pass
except AttributeError:
pass # Ignore because HA shutdown on snitun token refresh
finally:
# start retry task
if self._snitun and not self._reconnect_task:
self._reconnect_task = self.cloud.run_task(self._reconnect_snitun())