Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_create_node_exists_error(self):
self.zk.retry.side_effect = exceptions.NodeExistsError()
self.reg._create_node()
self.zk.retry.assert_called_once_with(
self.zk.create,
'/unittest/host:22',
value=self.reg._encoded_data, ephemeral=False, makepath=False)
def create(self, path, value=b"", acl=None, ephemeral=False, sequence=False, makepath=False):
if not isinstance(path, six.string_types):
raise TypeError("Invalid type for 'path' (string expected)")
if not isinstance(value, (six.binary_type,)):
raise TypeError("Invalid type for 'value' (must be a byte string)")
if b'Exception' in value:
raise Exception
if path.endswith('/initialize') or path == '/service/test/optime/leader':
raise Exception
elif b'retry' in value or (b'exists' in value and self.exists):
raise NodeExistsError
def test_supervisor_collision(application, database):
setup_application(application, database)
zookeeper = application.environment.zookeeper
path = application.get_consumer_group_membership_path(consumer_group_identifier)
zookeeper.ensure_path(path())
zookeeper.create(path(consumer_identifier))
with pytest.raises(NodeExistsError):
supervisor = Supervisor(application, consumer_group_identifier, consumer_identifier, DummyHandler())
supervisor.start()
supervisor.result(5)
def register_as_hoster(self, revision_key, md5):
""" Adds an entry to indicate that the local machine has the archive.
Args:
revision_key: A string specifying a revision key.
md5: A string specifying the source archive's MD5 hex digest.
"""
new_hoster_node = '/apps/{}/{}'.format(revision_key, options.private_ip)
try:
yield self.thread_pool.submit(self.zk_client.create, new_hoster_node,
md5, makepath=True)
except NodeExistsError:
logger.debug('{} is already a hoster'.format(options.private_ip))
tr = self.client.transaction()
for lockid in locks:
path = self.lock_path_prefix + lockid
parts = path.rsplit('/', 1)
if len(parts) == 2 and parts[0] not in ensured_paths:
self.client.ensure_path(parts[0])
ensured_paths.add(parts[0])
tr.create(path, data)
failed = False
failed_locks = []
result = tr.commit()
for i, res in enumerate(result):
if isinstance(res, ZookeeperError):
failed = True
if isinstance(res, NodeExistsError):
failed_locks.append(locks[i])
if failed_locks:
holders = []
for f in failed_locks:
# TODO: fetch all holders with 1 transaction request
holders.append((f, self.client.get(self.lock_path_prefix + f)))
foreign_holders = [(l, h) for l, h in holders if h[0] != data]
failed_lock, holder_resp = foreign_holders and foreign_holders[0] or holders[0]
holder = holder_resp[0]
holders_ids = list(set(h[0] for _, h in holders))
logger.warn('Persistent lock {0} is already set by {1}'.format(failed_lock, holder))
raise LockAlreadyAcquiredError(
'Lock for {0} is already acquired by job {1}'.format(failed_lock, holder),
lock_id=failed_lock, holder_id=holder,
lock_ids=failed_locks, holders_ids=holders_ids)
def acquire(self):
"""Acquires the lock for the given entry.
:returns: True if a lock was acquired succesfully, otherwise raises
some kind of KazooException.
:rtype: bool
"""
try:
self._client.retry(
self._client.create,
'{path}/{id}'.format(path=self._lock_path,
id=self.entry_id),
self.lock_id,
ephemeral=True)
except (NodeExistsError, RetryFailedError):
if not self.holds_lock():
raise LockError
try:
value, stat = self._client.retry(
self._client.get,
'{path}/{id}'.format(path=self._entries_path, id=self.entry_id))
except (NoNodeError, RetryFailedError):
if self.holds_lock():
self._client.retry(self._inner_release)
self.data = value
return True
if possible_path not in self:
path = possible_path
break
parents = sorted(six.iterkeys(self.get_parents(path)))
if parent_path not in self:
if sequence:
self._sequences.pop(parent_path, None)
raise k_exceptions.NoNodeError("Parent node %s does not exist"
% (parent_path))
if ephemeral and not session_id:
raise k_exceptions.SystemZookeeperError("Ephemeral node %s can"
" not be created"
" without a session"
" id" % path)
if path in self:
raise k_exceptions.NodeExistsError("Node %s already"
" exists" % (path))
for parent_path in reversed(parents):
if self._paths[parent_path]['ephemeral']:
raise k_exceptions.NoChildrenForEphemeralsError(
"Parent %s is ephemeral" % parent_path)
path_data = {
# Kazoo clients expect in milliseconds
'created_on': utils.millitime(),
'updated_on': utils.millitime(),
'version': 0,
# Not supported for now...
'aversion': -1,
'cversion': -1,
'data': value,
}
if ephemeral:
def _lock_and_get_entry(self, entry_node: str) -> Optional[Tuple[bytes, ZnodeStat]]:
try:
lock_path = f"{self.locks_path}/{entry_node}"
self.locked_entry_nodes.add(entry_node)
self.client.create(lock_path, value=self.id, ephemeral=True)
except NodeExistsError:
self.locked_entry_nodes.add(entry_node)
return None
try:
return self.client.get(f"{self.entries_path}/{entry_node}")
except NoNodeError:
self.client.delete(lock_path)
return None
def _create(self):
logger.debug('to creaet: {s}'.format(s=str(self)))
try:
self.zkclient.create(self.lock_path,
utfjson.dump(self.identifier),
ephemeral=self.ephemeral,
acl=self.zkconf.kazoo_digest_acl())
except NodeExistsError as e:
# NOTE Success create on server side might also results in failure
# on client side due to network issue.
# 'get' after 'create' to check if existent node belongs to this
# client.
logger.debug(repr(e) + ' while create lock: {s}'.format(s=str(self)))
self.lock_holder = None
return
logger.info('CREATE OK: {s}'.format(s=str(self)))
def initial(self, data):
#
# - the /hash node is where we store the md5 hash of all our pods + their dependencies
# - the /snapshot node is where we store the last known state of our pods (e.g where they run from and what
# their port mapping is)
#
try:
self.zk.create('%s/%s.%s/snapshot' % (ROOT, self.scope, self.tag), value='{}', ephemeral=True)
except NodeExistsError:
pass
#
# - start the watch our local pods
# - this ancillary actor will piggy-back on our zk client and use it to query our pod
# information on a regular basis
#
data.dirty = 0
data.last = None
data.next_probe = 0
self.snapshots['local'] = {}
self.watchers = [Local.start(self.actor_ref, self.zk, self.scope, self.tag)]
#
# - add a set of extra watchers for our dependencies
# - make sure to look for clusters within our own namespace