Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def watch_folder(self, path, enqueue_children=False):
"""recursive nonsense"""
if "autoscaling.lock" in path:
return
if path.split("/")[-1] == "instances":
self.watch_node(path, enqueue=enqueue_children)
return
self.log.info(f"Adding folder watch on {path}")
watcher = ChildrenWatch(
self.zk, path, func=self.process_folder_event, send_event=True
)
self.watchers[path] = watcher
children = watcher._client.get_children(watcher._path)
if children:
for child in children:
self.watch_folder(f"{path}/{child}", enqueue_children=enqueue_children)
self._setting_watches = True
# Set all our watches and then rebalance
broker_path = '/brokers/ids'
try:
self._broker_watcher = ChildrenWatch(
self._zookeeper, broker_path,
_brokers_changed
)
except NoNodeException:
raise Exception(
'The broker_path "%s" does not exist in your '
'ZooKeeper cluster -- is your Kafka cluster running?'
% broker_path)
self._topics_watcher = ChildrenWatch(
self._zookeeper,
'/brokers/topics',
_topics_changed
)
self._consumer_watcher = ChildrenWatch(
self._zookeeper, self._consumer_id_path,
_consumers_changed
)
self._setting_watches = False
self._zookeeper, broker_path,
_brokers_changed
)
except NoNodeException:
raise Exception(
'The broker_path "%s" does not exist in your '
'ZooKeeper cluster -- is your Kafka cluster running?'
% broker_path)
self._topics_watcher = ChildrenWatch(
self._zookeeper,
'/brokers/topics',
_topics_changed
)
self._consumer_watcher = ChildrenWatch(
self._zookeeper, self._consumer_id_path,
_consumers_changed
)
self._setting_watches = False
timeout = float(timeout)
self._client.start(timeout=timeout)
self._closing = False
except (self._client.handler.timeout_exception,
k_exceptions.KazooException):
excp.raise_with_cause(excp.JobFailure,
"Failed to connect to zookeeper")
try:
if self._conf.get('check_compatible', True):
kazoo_utils.check_compatible(self._client, self.MIN_ZK_VERSION)
if self._worker is None and self._emit_notifications:
self._worker = futurist.ThreadPoolExecutor(max_workers=1)
self._client.ensure_path(self.path)
self._client.ensure_path(self.trash_path)
if self._job_watcher is None:
self._job_watcher = watchers.ChildrenWatch(
self._client,
self.path,
func=self._on_job_posting,
allow_session_lost=True)
self._connected = True
except excp.IncompatibleVersion:
with excutils.save_and_reraise_exception():
try_clean()
except (self._client.handler.timeout_exception,
k_exceptions.KazooException):
exc_type, exc, exc_tb = sys.exc_info()
try:
try_clean()
excp.raise_with_cause(excp.JobFailure,
"Failed to do post-connection"
" initialization", cause=exc)
except NoNodeException:
raise ImproperlyConfiguredError(
'The broker_path "%s" does not exist in your '
'ZooKeeper cluster -- is your Kafka cluster running?'
% broker_path)
topics_path = '/brokers/topics'
self._topics_watcher = ChildrenWatch(
self.cluster.zookeeper,
'/brokers/topics',
self._topics_changed
)
self._rebalancing = True
# Final watch will trigger rebalance
self._consumer_watcher = ChildrenWatch(
self.cluster.zookeeper, self.id_path,
self._consumers_changed
)
else:
logger.debug("More consumers than partitions. "
"Waiting %is to retry" % (i+1) ** 2)
else:
raise NoAvailablePartitionsError("Couldn't acquire partition. "
"More consumers than partitions.")
path = '%s/%s' % (self.id_path, self.id)
self.cluster.zookeeper.create(
path, self.topic.name, ephemeral=True, makepath=True)
# Set all our watches and then rebalance
self._rebalancing = False
broker_path = '/brokers/ids'
try:
self._broker_watcher = ChildrenWatch(
self.cluster.zookeeper, broker_path,
self._brokers_changed
)
except NoNodeException:
raise ImproperlyConfiguredError(
'The broker_path "%s" does not exist in your '
'ZooKeeper cluster -- is your Kafka cluster running?'
% broker_path)
topics_path = '/brokers/topics'
self._topics_watcher = ChildrenWatch(
self.cluster.zookeeper,
'/brokers/topics',
self._topics_changed
)
self._rebalancing = True
def run(self):
def cancel_on_stop(function):
@functools.wraps(function)
def watch(*args, **kwargs):
if self.__stop_requested.is_set():
return False
return function(*args, **kwargs)
return watch
consumer_updates = Queue()
consumers = None
ChildrenWatch(
self.application.environment.zookeeper,
self.application.get_consumer_group_membership_path(self.consumer_group_identifier)(),
cancel_on_stop(consumer_updates.put),
# TODO: Handle disconnection from ZooKeeper.
)
group_updates = Queue()
groups = None
ChildrenWatch(
self.application.environment.zookeeper,
self.application.get_group_path(),
cancel_on_stop(group_updates.put),
# TODO: Handle disconnection from ZooKeeper.
)
assignments = {}
def start_listening(self):
"""
Starts the zookeeper client and sets the watcher
"""
self.zookeeper.start()
ChildrenWatch(self.zookeeper, self.nerve_directory, self.get_nodes)