Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def listen(state):
if state == KazooState.CONNECTED:
started.set()
return True
tzk.add_listener(listen)
def expire_session(self, event_factory):
"""Force ZK to expire a client session"""
self.__break_connection(_SESSION_EXPIRED, KazooState.LOST,
event_factory)
def listener(state):
if state == KazooState.LOST:
session_expired.set()
def _session_watcher(self, state):
if state in (KazooState.LOST, KazooState.SUSPENDED):
self._watch_established = False
elif state == KazooState.CONNECTED and \
not self._watch_established and not self._stopped:
self._client.handler.spawn(self._get_children)
if self._last_states:
LOG.debug("Kazoo client has changed to"
" state '%s' from prior states '%s'", state,
self._last_states)
else:
LOG.debug("Kazoo client has changed to state '%s' (from"
" its initial/uninitialized state)", state)
self._last_states.appendleft(state)
if state == k_states.KazooState.LOST:
self._connected = False
# When the client is itself closing itself down this will be
# triggered, but in that case we expect it, so we don't need
# to emit a warning message.
if not self._closing:
LOG.warning("Connection to zookeeper has been lost")
elif state == k_states.KazooState.SUSPENDED:
LOG.warning("Connection to zookeeper has been suspended")
self._suspended = True
else:
# Must be CONNECTED then (as there are only 3 enums)
if self._suspended:
self._suspended = False
def close(self, close_handler=True):
if self._connected:
with self._open_close_lock:
if self._connected:
self._connected = False
with self._watches_lock:
self._child_watchers.clear()
self._data_watchers.clear()
self.storage.purge(self)
self._fire_state_change(k_states.KazooState.LOST)
if self._own_handler and close_handler:
self.handler.stop()
self._partial_client.session_id = None
self.handler = handler if handler else SequentialThreadingHandler()
if inspect.isclass(self.handler):
raise ConfigurationError("Handler must be an instance of a class, "
"not the class: %s" % self.handler)
self.auth_data = auth_data if auth_data else set([])
self.default_acl = default_acl
self.randomize_hosts = randomize_hosts
self.hosts = None
self.chroot = None
self.set_hosts(hosts)
# Curator like simplified state tracking, and listeners for
# state transitions
self._state = KeeperState.CLOSED
self.state = KazooState.LOST
self.state_listeners = set()
self._reset()
self.read_only = read_only
if client_id:
self._session_id = client_id[0]
self._session_passwd = client_id[1]
else:
self._reset_session()
# ZK uses milliseconds
self._session_timeout = int(timeout * 1000)
# We use events like twitter's client to track current and
# desired state (connected, and whether to shutdown)
# Handle network partition: If connection gets suspended,
# change state to ALLOCATING if we had already ACQUIRED. This way
# the caller does not process the members since we could eventually
# lose session get repartitioned. If we got connected after a suspension
# it means we've not lost the session and still have our members. Hence,
# restore to ACQUIRED
if state == KazooState.SUSPENDED:
if self.state == PartitionState.ACQUIRED:
self._was_allocated = True
self.state = PartitionState.ALLOCATING
elif state == KazooState.CONNECTED:
if self._was_allocated:
self._was_allocated = False
self.state = PartitionState.ACQUIRED
if state == KazooState.LOST:
self._client.handler.spawn(self._fail_out)
return True
def zk_state_listener(state):
""" Handles changes to ZooKeeper connection state.
Args:
state: A string specifying the new ZooKeeper connection state.
"""
if state == KazooState.CONNECTED:
persistent_create_server_node = retry_data_watch_coroutine(
server_node, create_server_node)
IOLoop.instance().add_callback(persistent_create_server_node)
def _session_watcher(self, state):
if state == KazooState.SUSPENDED:
self._publish_event(TreeEvent.CONNECTION_SUSPENDED)
elif state == KazooState.CONNECTED:
# The session watcher should not be blocked
self._in_background(self._root.on_reconnected)
self._publish_event(TreeEvent.CONNECTION_RECONNECTED)
elif state == KazooState.LOST:
self._is_initialized = False
self._publish_event(TreeEvent.CONNECTION_LOST)