Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@fasteners.locked
def break_(self):
reply = self.client.delete(self._lock_url, make_url=False)
return reply.get('errorCode') is None
@fasteners.locked
def compile(self):
"""Compiles the contained item into a compiled equivalent."""
if self._compilation is None:
self._pre_compile()
try:
graph, node = self._compile(self._root, parent=None)
except Exception:
with excutils.save_and_reraise_exception():
# Always clear the history, to avoid retaining junk
# in memory that isn't needed to be in memory if
# compilation fails...
self._history.clear()
else:
self._post_compile(graph, node)
if self._freeze:
graph.freeze()
@fasteners.locked(lock='_open_close_lock')
def close(self):
if self._owned:
LOG.debug("Stopping client")
self._closing = True
kazoo_utils.finalize_client(self._client)
if self._worker is not None:
LOG.debug("Shutting down the notifier")
self._worker.shutdown()
self._worker = None
with self._job_cond:
self._known_jobs.clear()
LOG.debug("Stopped & cleared local state")
self._connected = False
self._last_states.clear()
@fasteners.locked(lock='_open_close_lock')
def close(self):
if self._owns_client:
self._client.close()
self._scripts.clear()
self._redis_version = None
self._closed = True
@fasteners.locked
def close(self):
"""Closes the contained jobboard, disallowing further use."""
self._jobboard.close()
@fasteners.locked
def heartbeat(self):
"""Keep the lock alive."""
if self.acquired:
poked = self.client.put(self._lock_url,
data={"ttl": self.ttl,
"prevExist": "true"}, make_url=False)
self._node = poked['node']
errorcode = poked.get("errorCode")
if not errorcode:
return True
LOG.warning("Unable to heartbeat by updating key '%s' with "
"extended expiry of %s seconds: %d, %s", self.name,
self.ttl, errorcode, poked.get("message"))
return False
@fasteners.locked
def connect(self):
"""Ensures the jobboard is connected (noop if it is already)."""
if not self._jobboard.connected:
self._jobboard.connect()
@fasteners.locked
@_pre_check(check_storage_ensured=False, check_validated=False)
def prepare(self):
if not self._storage_ensured:
# Set our own state to resuming -> (ensure atoms exist
# in storage) -> suspended in the storage unit and notify any
# attached listeners of these changes.
self._change_state(states.RESUMING)
self._ensure_storage()
self._change_state(states.SUSPENDED)
self._storage_ensured = True
# Reset everything back to pending (if we were previously reverted).
if self.storage.get_flow_state() == states.REVERTED:
self.reset()
@fasteners.locked
@_pre_check(check_validated=False)
def validate(self):
# At this point we can check to ensure all dependencies are either
# flow/task provided or storage provided, if there are still missing
# dependencies then this flow will fail at runtime (which we can avoid
# by failing at validation time).
if LOG.isEnabledFor(logging.TRACE):
execution_graph = self._compilation.execution_graph
LOG.trace("Validating scoping and argument visibility for"
" execution graph with %s nodes and %s edges with"
" density %0.3f", execution_graph.number_of_nodes(),
execution_graph.number_of_edges(),
nx.density(execution_graph))
missing = set()
# Attempt to retain a chain of what was missing (so that the final
# raised exception for the flow has the nodes that had missing