Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@fasteners.write_locked
def reset(self, atom_name, state=states.PENDING):
"""Reset atom with given name (if the atom is not in a given state)."""
if atom_name == self.injector_name:
return
source, clone = self._atomdetail_by_name(atom_name, clone=True)
if source.state == state:
return
clone.reset(state)
self._with_connection(self._save_atom_detail, source, clone)
self._failures[clone.name].clear()
@fasteners.write_locked
def save(self, atom_name, result, state=states.SUCCESS):
"""Put result for atom with provided name to storage."""
source, clone = self._atomdetail_by_name(atom_name, clone=True)
if clone.put(state, result):
self._with_connection(self._save_atom_detail, source, clone)
# We need to somehow place more of this responsibility on the atom
# detail class itself, vs doing it here; since it ties those two
# together (which is bad)...
if state in (states.FAILURE, states.REVERT_FAILURE):
# NOTE(imelnikov): failure serialization looses information,
# so we cache failures here, in atom name -> failure mapping so
# that we can later use the better version on fetch/get.
if clone.intention in _SAVE_FAILURE_INTENTIONS:
fail_cache = self._failures[clone.name]
fail_cache[clone.intention] = result
if state == states.SUCCESS and clone.intention == states.EXECUTE:
@fasteners.write_locked
def set_flow_state(self, state):
"""Set flow details state and save it."""
source, clone = self._fetch_flowdetail(clone=True)
clone.state = state
self._with_connection(self._save_flow_detail, source, clone)
@fasteners.write_locked
def inject(self, pairs, transient=False):
"""Add values into storage.
This method should be used to put flow parameters (requirements that
are not satisfied by any atom in the flow) into storage.
:param transient: save the data in-memory only instead of persisting
the data to backend storage (useful for resource-like objects
or similar objects which can **not** be persisted)
.. warning::
It should be noted that injected flow arguments (that are scoped
to all atoms in this flow) *should* be serializable whenever
possible. This is a **requirement** for
the :doc:`worker based engine ` which **must**
@fasteners.write_locked
def set_atom_intention(self, atom_name, intention):
"""Sets the intention of an atom given an atoms name."""
source, clone = self._atomdetail_by_name(atom_name, clone=True)
if source.intention != intention:
clone.intention = intention
self._with_connection(self._save_atom_detail, source, clone)
@fasteners.write_locked
def save_retry_failure(self, retry_name, failed_atom_name, failure):
"""Save subflow failure to retry controller history."""
source, clone = self._atomdetail_by_name(
retry_name, expected_type=models.RetryDetail, clone=True)
try:
failures = clone.last_failures
except exceptions.NotFound:
exceptions.raise_with_cause(exceptions.StorageFailure,
"Unable to fetch most recent retry"
" failures so new retry failure can"
" be inserted")
else:
if failed_atom_name not in failures:
failures[failed_atom_name] = failure
self._with_connection(self._save_atom_detail, source, clone)
@fasteners.write_locked
def set_atom_state(self, atom_name, state):
"""Sets an atoms state."""
source, clone = self._atomdetail_by_name(atom_name, clone=True)
if source.state != state:
clone.state = state
self._with_connection(self._save_atom_detail, source, clone)
@fasteners.write_locked
def change_flow_state(self, state):
"""Transition flow from old state to new state.
Returns ``(True, old_state)`` if transition was performed,
or ``(False, old_state)`` if it was ignored, or raises a
:py:class:`~taskflow.exceptions.InvalidState` exception if transition
is invalid.
"""
old_state = self.get_flow_state()
if not states.check_flow_transition(old_state, state):
return (False, old_state)
self.set_flow_state(state)
return (True, old_state)
@fasteners.write_locked
def ensure_atoms(self, atoms):
"""Ensure there is an atomdetail for **each** of the given atoms.
Returns list of atomdetail uuids for each atom processed.
"""
atom_ids = []
missing_ads = []
for i, atom in enumerate(atoms):
match = misc.match_type(atom, self._ensure_matchers)
if not match:
raise TypeError("Unknown atom '%s' (%s) requested to ensure"
% (atom, type(atom)))
atom_detail_cls, kind = match
atom_name = atom.name
if not atom_name:
raise ValueError("%s name must be non-empty" % (kind))
@fasteners.write_locked
def cleanup_retry_history(self, retry_name, state):
"""Cleanup history of retry atom with given name."""
source, clone = self._atomdetail_by_name(
retry_name, expected_type=models.RetryDetail, clone=True)
clone.state = state
clone.results = []
self._with_connection(self._save_atom_detail, source, clone)