Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
with self.storage.lock:
if sequence:
path = utils.normpath(path, keep_trailing=True)
else:
path = utils.normpath(path, keep_trailing=False)
if makepath:
for parent_path in utils.partition_path(path)[0:-1]:
if parent_path not in self.storage:
result = self.create(parent_path)
data_watches.extend(result[1])
child_watches.extend(result[2])
created, parents, path = self.storage.create(
path, value=value, sequence=sequence,
ephemeral=ephemeral, session_id=self.session_id)
if parents:
event = k_states.WatchedEvent(type=k_states.EventType.CHILD,
state=k_states.KeeperState.CONNECTED,
path=path)
child_watches.append((parents, event))
if created:
event = k_states.WatchedEvent(type=k_states.EventType.CREATED,
state=k_states.KeeperState.CONNECTED,
path=path)
data_watches.append(([path], event))
return (path, data_watches, child_watches)
def _cluster_path_callback(self, data, stat, event):
if event and event.type == EventType.DELETED:
self._termination_callback()
return 0
with self._client_lock:
if client in self._clients:
self._clients.discard(client)
else:
return 0
removals = []
with self.lock:
for path, data in six.iteritems(self._paths):
if data['ephemeral'] \
and data['ephemeral_owner'] == client.session_id:
removals.append(path)
data_watches = []
for path in removals:
event = k_states.WatchedEvent(
type=k_states.EventType.DELETED,
state=k_states.KeeperState.CONNECTED,
path=path)
data_watches.append(([path], event))
fire_paths = []
for path in removals:
parents = sorted(six.iterkeys(self.get_parents(path)))
for parent_path in parents:
if parent_path in fire_paths:
continue
fire_paths.append(parent_path)
child_watches = []
for path in fire_paths:
event = k_states.WatchedEvent(
type=k_states.EventType.DELETED,
state=k_states.KeeperState.CONNECTED,
path=path)
def _process_watch(self, watched_event):
logger.debug('process_watch: %r', watched_event)
with handle_exception(self._tree._error_listeners):
if watched_event.type == EventType.CREATED:
assert self._parent is None, 'unexpected CREATED on non-root'
self.on_created()
elif watched_event.type == EventType.DELETED:
self.on_deleted()
elif watched_event.type == EventType.CHANGED:
self._refresh_data()
elif watched_event.type == EventType.CHILD:
self._refresh_children()
def check_for_updates(event):
if not event is None and event.type != EventType.CHILD:
return
with lock:
if canceled or flag.isSet():
return
values = self.client.retry(self.client.get_children,
self._entries_path,
check_for_updates)
taken = self.client.retry(self.client.get_children,
self._lock_path,
check_for_updates)
available = self._filter_locked(values, taken)
if len(available) > 0:
ret = self._take(available[0])
if not ret is None:
# By this time, no one took the task
value.append(ret)
def _watcher(self, watched_event):
for path, stats in self._stats_by_path.items():
if not watched_event.path.startswith(path):
continue
if watched_event.type == EventType.CHILD:
stats.paths[watched_event.path] += 1
if stats.debug:
print(str(watched_event))
if watched_event.type == EventType.CHILD:
try:
children = self._client.get_children(watched_event.path,
self._watcher)
except NoNodeError:
pass
def check_for_updates(event):
if event is not None and event.type != EventType.CHILD:
return
with lock:
if canceled or flag.isSet():
return
values = self.client.retry(
self.client.get_children,
self._entries_path,
check_for_updates)
taken = self.client.retry(
self.client.get_children,
self._lock_path,
check_for_updates)
available = self._filter_locked(values, taken)
if len(available) > 0:
ret = self._take(available[0])
if ret is not None:
def exists_watch(event):
if event.state == KeeperState.EXPIRED_SESSION:
wait_exists()
return
if event.type == EventType.CREATED:
do_monitor()
elif event.type == EventType.DELETED:
wait_exists()
if makepath:
for parent_path in utils.partition_path(path)[0:-1]:
if parent_path not in self.storage:
result = self.create(parent_path)
data_watches.extend(result[1])
child_watches.extend(result[2])
created, parents, path = self.storage.create(
path, value=value, sequence=sequence,
ephemeral=ephemeral, session_id=self.session_id)
if parents:
event = k_states.WatchedEvent(type=k_states.EventType.CHILD,
state=k_states.KeeperState.CONNECTED,
path=path)
child_watches.append((parents, event))
if created:
event = k_states.WatchedEvent(type=k_states.EventType.CREATED,
state=k_states.KeeperState.CONNECTED,
path=path)
data_watches.append(([path], event))
return (path, data_watches, child_watches)
removed). This event does not indicate the data for a child
node has changed, which must have its own watch established.
.. attribute:: NONE
The connection state has been altered.
"""
CREATED = 'CREATED'
DELETED = 'DELETED'
CHANGED = 'CHANGED'
CHILD = 'CHILD'
NONE = 'NONE'
EVENT_TYPE_MAP = {
-1: EventType.NONE,
1: EventType.CREATED,
2: EventType.DELETED,
3: EventType.CHANGED,
4: EventType.CHILD
}
class WatchedEvent(namedtuple('WatchedEvent', ('type', 'state', 'path'))):
"""A change on ZooKeeper that a Watcher is able to respond to.
The :class:`WatchedEvent` includes exactly what happened, the
current state of ZooKeeper, and the path of the node that was
involved in the event. An instance of :class:`WatchedEvent` will be
passed to registered watch functions.
.. attribute:: type