Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
self._retries = retries
self._timeout = timeout
self._server_stats_callback = server_stats_callback
# the format of the peer tuple is different for v4 and v6
self._family = socket.AF_INET6
if isinstance(
ipaddress.ip_address(self._address), ipaddress.IPv4Address
):
self._family = socket.AF_INET
self._listener = socket.socket(self._family, socket.SOCK_DGRAM)
self._listener.setblocking(0) # non-blocking
self._listener.bind((address, port))
self._epoll = select.epoll()
self._epoll.register(self._listener.fileno(), select.EPOLLIN)
self._should_stop = False
self._server_stats = ServerStats(address, stats_interval_seconds)
self._metrics_timer = None
self._address = address
self._port = port
self._retries = retries
self._timeout = timeout
self._server_stats_callback = server_stats_callback
# the format of the peer tuple is different for v4 and v6
self._family = socket.AF_INET6
if isinstance(ipaddress.ip_address(self._address), ipaddress.IPv4Address):
self._family = socket.AF_INET
self._listener = socket.socket(self._family, socket.SOCK_DGRAM)
self._listener.setblocking(0) # non-blocking
self._listener.bind((address, port))
self._epoll = select.epoll()
self._epoll.register(self._listener.fileno(), select.EPOLLIN)
self._should_stop = False
self._server_stats = ServerStats(address, stats_interval_seconds)
self._metrics_timer = None