Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@fasteners.read_locked
def get_atom_state(self, atom_name):
"""Gets the state of an atom given an atoms name."""
source, _clone = self._atomdetail_by_name(atom_name)
return source.state
@fasteners.read_locked
def has_failures(self):
"""Returns true if there are **any** failures in storage."""
for fail_cache in six.itervalues(self._failures):
if fail_cache:
return True
return False
@fasteners.read_locked
def get_atom_intention(self, atom_name):
"""Gets the intention of an atom given an atoms name."""
source, _clone = self._atomdetail_by_name(atom_name)
return source.intention
@fasteners.read_locked
def fetch_unsatisfied_args(self, atom_name, args_mapping,
scope_walker=None, optional_args=None):
"""Fetch unsatisfied ``execute`` arguments using an atoms args mapping.
NOTE(harlowja): this takes into account the provided scope walker
atoms who should produce the required value at runtime, as well as
the transient/persistent flow and atom specific injected arguments.
It does **not** check if the providers actually have produced the
needed values; it just checks that they are registered to produce
it in the future.
"""
source, _clone = self._atomdetail_by_name(atom_name)
if scope_walker is None:
scope_walker = self._scope_fetcher(atom_name)
if optional_args is None:
optional_args = []
@fasteners.read_locked
def _get_failures(self, fail_cache_key):
failures = {}
for atom_name, fail_cache in six.iteritems(self._failures):
try:
failures[atom_name] = fail_cache[fail_cache_key]
except KeyError:
pass
return failures
@fasteners.read_locked
def get_atom_uuid(self, atom_name):
"""Gets an atoms uuid given a atoms name."""
source, _clone = self._atomdetail_by_name(atom_name)
return source.uuid
@fasteners.read_locked
def get_atoms_states(self, atom_names):
"""Gets a dict of atom name => (state, intention) given atom names."""
details = {}
for name in set(atom_names):
source, _clone = self._atomdetail_by_name(name)
details[name] = (source.state, source.intention)
return details
@fasteners.read_locked
def get_flow_state(self):
"""Get state from flow details."""
source = self._flowdetail
state = source.state
if state is None:
state = states.PENDING
return state
@fasteners.read_locked
def get_retry_histories(self):
"""Fetch all retrys histories."""
histories = []
for ad in self._flowdetail:
if isinstance(ad, models.RetryDetail):
histories.append((ad.name,
self._translate_into_history(ad)))
return histories
@fasteners.read_locked
def fetch_mapped_args(self, args_mapping,
atom_name=None, scope_walker=None,
optional_args=None):
"""Fetch ``execute`` arguments for an atom using its args mapping."""
def _extract_first_from(name, sources):
"""Extracts/returns first occurrence of key in list of dicts."""
for i, source in enumerate(sources):
if not source:
continue
if name in source:
return (i, source[name])
raise KeyError(name)
if optional_args is None:
optional_args = []
if atom_name:
source, _clone = self._atomdetail_by_name(atom_name)