Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_init_new_peer_invalid_fernet():
"""Init a new peer."""
manager = PeerManager(FERNET_TOKENS)
with pytest.raises(SniTunInvalidPeer):
manager.create_peer(os.urandom(100))
def test_init_new_peer_not_valid_time():
"""Init a new peer."""
manager = PeerManager(FERNET_TOKENS)
valid = datetime.utcnow() - timedelta(days=1)
aes_key = os.urandom(32)
aes_iv = os.urandom(16)
hostname = "localhost"
fernet_token = create_peer_config(valid.timestamp(), hostname, aes_key, aes_iv)
with pytest.raises(SniTunInvalidPeer):
manager.create_peer(fernet_token)
def create_peer(self, fernet_data: bytes) -> Peer:
"""Create a new peer from crypt config."""
try:
data = self._fernet.decrypt(fernet_data).decode()
config = json.loads(data)
except (InvalidToken, json.JSONDecodeError):
_LOGGER.warning("Invalid fernet token")
raise SniTunInvalidPeer()
# Check if token is valid
valid = datetime.utcfromtimestamp(config["valid"])
if valid < datetime.utcnow():
_LOGGER.warning("Token was expired")
raise SniTunInvalidPeer()
# Extract configuration
hostname = config["hostname"]
aes_key = bytes.fromhex(config["aes_key"])
aes_iv = bytes.fromhex(config["aes_iv"])
return Peer(hostname, valid, aes_key, aes_iv, throttling=self._throttling)
return
peer = self._peer_manager.create_peer(fernet_data)
# Start multiplexer
await peer.init_multiplexer_challenge(reader, writer)
self._peer_manager.add_peer(peer)
while peer.is_connected:
try:
async with async_timeout.timeout(CHECK_VALID_EXPIRE):
await peer.wait_disconnect()
except asyncio.TimeoutError:
if not peer.is_valid:
break
except SniTunInvalidPeer:
_LOGGER.debug("Close because invalid fernet data")
except SniTunChallengeError:
_LOGGER.debug("Close because challenge was wrong")
finally:
if peer:
self._peer_manager.remove_peer(peer)
# Cleanup transport
if not writer.transport.is_closing():
with suppress(OSError):
writer.close()