Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def state_watcher(state):
if state is KazooState.LOST:
self.assertTrue(getattr(self.holder, attribute))
state_watcher.disconnects += 1
def _listener(state):
if state == KazooState.CONNECTED:
connected.set()
else:
lost_connection.set()
# With a callback, the callback should get executed
callback_checker = mock.MagicMock()
callback_checker.test.return_value = True
# Mock the state to be True
self.ndsr._conn_state = True
# Add our callback checker mock above and validate that the callback
# was executed once with True.
self.ndsr.get_state(callback_checker.test)
self.assertTrue(
callback_checker.test in self.ndsr._conn_state_callbacks)
callback_checker.test.assert_called_with(True)
# Now fake a state change to LOST
self.ndsr._state_listener(KazooState.LOST)
# Now validate that the callback was executed once with False when
# we updated the state
callback_checker.test.assert_called_with(False)
# Now fake a state change to LOST
self.ndsr._state_listener(KazooState.SUSPENDED)
# Now validate that the callback was executed once with False when
# we updated the state
callback_checker.test.assert_called_with(False)
def _on_conn_change(self, state):
logger.debug('state changed: {state}'.format(state=state,))
with self.state_lock:
if state == KazooState.LOST or state == KazooState.SUSPENDED:
self.connected = False
def _zkchange(self, state):
if state == KazooState.CONNECTED:
logger.info("Now connected to Zookeeper")
self.urgent_event("connection recovered")
elif state == KazooState.LOST:
logger.warn("Connection to Zookeeper lost")
self.urgent_event("connection lost")
elif state == KazooState.SUSPENDED:
logger.warn("Connection to Zookeeper suspended")
logger.debug("Connection is considered as lost")
self.urgent_event("connection lost")
def __conn_listener(self, state):
"""
Connection event listener
:param state: The new connection state
"""
if state == KazooState.CONNECTED:
self.__online = True
if not self.__connected:
self.__connected = True
self._logger.info("Connected to ZooKeeper")
self._queue.enqueue(self.on_first_connection)
else:
self._logger.warning("Re-connected to ZooKeeper")
self._queue.enqueue(self.on_client_reconnection)
elif state == KazooState.SUSPENDED:
self._logger.warning("Connection suspended")
self.__online = False
elif state == KazooState.LOST:
self.__online = False
self.__connected = False
if self.__stop:
def state_listener(self, state):
'''
Restarts the session if we get anything besides CONNECTED
'''
if state == KazooState.SUSPENDED:
self.set_valid(False)
self.call_error(self.BAD_CONNECTION)
elif state == KazooState.LOST and not self.do_not_restart:
self.threaded_start()
elif state == KazooState.CONNECTED:
# This is going to throw a SUSPENDED kazoo error
# which will cause the sessions to be wiped and re established.
# Used b/c of massive connection pool issues
self.zoo_client.stop()
def specialized(self, msg):
assert 'request' in msg, 'bogus message received ?'
req = msg['request']
if req == 'state change':
#
# - we got a zk state change
# - we only use the switch to CONNECTED to go from wait_for_cnx() to spin()
# - ZK disconnects (LOST or SUSPENDED) are simply flagged when exceptions are raised
#
state = msg['state']
logger.debug('%s : zk state change -> %s (%s)' % (self.path, str(state), 'connected' if self.connected else 'disconnected'))
if self.connected and state != KazooState.CONNECTED:
logger.warning('%s : lost connection (%s) / forcing a reset' % (self.path, str(state)))
self.force_reset = 1
self.connected = 0
elif state == KazooState.CONNECTED:
self.connected = 1
elif req == 'reset':
#
# - we got a request to explicitly force a reset
# - this is typically invoked from the CLI
#
self.force_reset = 1
else:
def _zkchange(self, state):
if state == KazooState.CONNECTED:
logger.info("Now connected to Zookeeper")
self.urgent_event("connection recovered")
elif state == KazooState.LOST:
logger.warn("Connection to Zookeeper lost")
self.urgent_event("connection lost")
elif state == KazooState.SUSPENDED:
logger.warn("Connection to Zookeeper suspended")
logger.debug("Connection is considered as lost")
self.urgent_event("connection lost")
def _lock_listener(self, state):
"""
Listener to handle ZK disconnection/reconnection. Since I don't know of safe way to check if a lock is still in
ZK after a reconnect, we simply release the lock and try and re-acquire it.
Args:
state (kazoo.client.KazooState): The state of the ZK connection
Returns:
None
"""
if state in [KazooState.LOST, KazooState.SUSPENDED]:
self._logger.warn('Disconnected from Zookeeper, waiting to reconnect lock for {}'.format(str(self)))
self._locked = False
elif state == KazooState.CONNECTED:
self._logger.warn(
'Reconnected to Zookeeper, trying to release and re-acquire lock for {}'.format(str(self)))
self._context.zookeeper_client.handler.spawn(self._release_and_reacquire)
else:
self._logger.warn('Got unknown state "{}" from Zookeeper'.format(state))