Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_good_client_hello(test_package: bytes):
"""Test good TLS packages."""
assert sni.parse_tls_sni(test_package) == "localhost"
def test_bad_client_hello(test_package: bytes):
"""Test bad client hello."""
with pytest.raises(ParseSNIError):
sni.parse_tls_sni(test_package)
return
except OSError:
return
else:
client_hello = data
# Connection closed before data received
if not client_hello:
with suppress(OSError):
writer.close()
return
try:
# Read Hostname
try:
hostname = parse_tls_sni(client_hello)
except ParseSNIError:
_LOGGER.warning("Receive invalid ClientHello on public Interface")
return
# Peer available?
if not self._peer_manager.peer_available(hostname):
_LOGGER.debug("Hostname %s not connected", hostname)
return
peer = self._peer_manager.get_peer(hostname)
# Proxy data over mutliplexer to client
_LOGGER.debug("Processing for hostname % started", hostname)
await self._proxy_peer(peer.multiplexer, client_hello, reader, writer)
finally:
if not writer.transport.is_closing():