Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@base_entry_point("dffml.operation.implementation.network", "opimp", "network")
class BaseOperationImplementationNetwork(BaseDataFlowObject):
"""
Knows where operations are or if they can be made
"""
class BaseOrchestratorConfig(BaseConfig, NamedTuple):
input_network: BaseInputNetwork
operation_network: BaseOperationNetwork
lock_network: BaseLockNetwork
opimp_network: BaseOperationImplementationNetwork
rchecker: BaseRedundancyChecker
class BaseOrchestratorContext(BaseDataFlowObjectContext):
@abc.abstractmethod
async def run_operations(
self, strict: bool = True
) -> AsyncIterator[Tuple[BaseContextHandle, Dict[str, Any]]]:
"""
Run all the operations then run cleanup and output operations
"""
@base_entry_point("dffml.orchestrator", "orchestrator")
class BaseOrchestrator(BaseDataFlowObject):
pass # pragma: no cov
@abc.abstractmethod
async def add(self, operation: Operation, parameter_set: BaseParameterSet):
pass
@base_entry_point("dffml.redundancy.checker", "rchecker")
class BaseRedundancyChecker(BaseDataFlowObject):
"""
Redundancy Checkers ensure that each operation within a context only gets
run with a give permutation of inputs once.
"""
# TODO Provide a way to clear out all locks for inputs within a context
class BaseLockNetworkContext(BaseDataFlowObjectContext):
@abc.abstractmethod
async def acquire(self, parameter_set: BaseParameterSet) -> bool:
"""
An async context manager which will acquire locks of all inputs within
the parameter set.
"""
@base_entry_point("dffml.lock.network", "lock", "network")
class BaseLockNetwork(BaseDataFlowObject):
"""
Acquires locks on inputs which may not be used simultaneously
"""
class BaseOperationImplementationNetworkContext(BaseDataFlowObjectContext):
def operation_in(iterable):
return __operation_in(iterable) + list(
map(lambda item: item.op, opwraped_in(iterable))
)
__opimp_in = mk_base_in(isopimp)
def opimp_in(iterable):
return __opimp_in(iterable) + list(
map(lambda item: item.imp, opwraped_in(iterable))
)
class BaseKeyValueStoreContext(BaseDataFlowObjectContext):
"""
Abstract Base Class for key value storage context
"""
@abc.abstractmethod
async def get(self, key: str) -> Union[bytes, None]:
"""
Get a value from the key value store
"""
@abc.abstractmethod
async def set(self, name: str, value: bytes):
"""
Get a value in the key value store
"""
"""
OperationImplementation cannot be instantiated and is required to continue.
"""
class OperationImplementationNotInstantiated(Exception):
"""
OperationImplementation is instantiable, but is not has not been
instantiated within the network and was required to continue.
Attempted to run operation which could be instantiated, but has not yet
been.
"""
class BaseOperationNetworkContext(BaseDataFlowObjectContext):
"""
Abstract Base Class for context managing operations
"""
@abc.abstractmethod
async def add(self, operations: List[Operation]):
"""
Add operations to the network
"""
@abc.abstractmethod
async def operations(
self, input_set: BaseInputSet = None, stage: Stage = Stage.PROCESSING
) -> AsyncIterator[Operation]:
"""
Retrieve all operations in the network of a given stage filtering by
"""
@classmethod
def args(cls, args, *above) -> Dict[str, Arg]:
if hasattr(cls, "CONFIG"):
return super(BaseDataFlowObject, cls).args(args, *above)
return args
@classmethod
def config(cls, config, *above) -> BaseConfig:
if hasattr(cls, "CONFIG"):
return super(BaseDataFlowObject, cls).config(config, *above)
return BaseConfig()
class OperationImplementationContext(BaseDataFlowObjectContext):
def __init__(
self,
parent: "OperationImplementation",
ctx: "BaseInputSetContext",
octx: "BaseOrchestratorContext",
) -> None:
self.parent = parent
self.ctx = ctx
self.octx = octx
@property
def config(self):
"""
Alias for self.parent.config
"""
return self.parent.config
@abc.abstractmethod
async def inputs(self) -> AsyncIterator[Input]:
pass
async def _asdict(self) -> Dict[str, Any]:
"""
Returns an parameter definition name to parameter value dict
"""
return {
parameter.key: parameter.value
async for parameter in self.parameters()
}
class BaseDefinitionSetContext(BaseDataFlowObjectContext):
def __init__(
self,
config: BaseConfig,
parent: "BaseInputNetworkContext",
ctx: "BaseInputSetContext",
) -> None:
super().__init__(config, parent)
self.ctx = ctx
@abc.abstractmethod
async def inputs(self, Definition: Definition) -> AsyncIterator[Input]:
"""
Asynchronous iterator of all inputs within a context, which are of a
definition.
"""
}
)
class MemoryOrchestratorConfig(BaseOrchestratorConfig):
"""
Same as base orchestrator config
"""
class MemoryOrchestratorContextConfig(NamedTuple):
uid: str
dataflow: DataFlow
# Context objects to reuse. If not present in this dict a new context object
# will be created.
reuse: Dict[str, BaseDataFlowObjectContext]
class MemoryOrchestratorContext(BaseOrchestratorContext):
def __init__(
self,
config: MemoryOrchestratorContextConfig,
parent: "BaseOrchestrator",
) -> None:
super().__init__(config, parent)
self._stack = None
async def __aenter__(self) -> "BaseOrchestratorContext":
# TODO(subflows) In all of these contexts we are about to enter, they
# all reach into their parents and store things in the parents memory
# (or similar). What should be done is to have them create their own
# storage space, so that each context is unique (which seems quite
class BaseOperationNetwork(BaseDataFlowObject):
"""
Operation networks hold Operation objects to allow for looking up of their
inputs, outputs, and conditions.
"""
class BaseRedundancyCheckerConfig(NamedTuple):
key_value_store: BaseKeyValueStore
# TODO store redundancy checks by BaseInputSetContext.handle() and add method
# to remove all associated with a particular handle. Aka allow us to clean up
# the input, redundancy, etc. networks after execution of a context completes
# via the orchestrator.
class BaseRedundancyCheckerContext(BaseDataFlowObjectContext):
"""
Abstract Base Class for redundancy checking context
"""
@abc.abstractmethod
async def exists(
self, operation: Operation, parameter_set: BaseParameterSet
) -> bool:
pass
@abc.abstractmethod
async def add(self, operation: Operation, parameter_set: BaseParameterSet):
pass
@base_entry_point("dffml.redundancy.checker", "rchecker")
@abc.abstractmethod
async def acquire(self, parameter_set: BaseParameterSet) -> bool:
"""
An async context manager which will acquire locks of all inputs within
the parameter set.
"""
@base_entry_point("dffml.lock.network", "lock", "network")
class BaseLockNetwork(BaseDataFlowObject):
"""
Acquires locks on inputs which may not be used simultaneously
"""
class BaseOperationImplementationNetworkContext(BaseDataFlowObjectContext):
@abc.abstractmethod
async def contains(self, operation: Operation) -> bool:
"""
Checks if the network contains / has the ability to run a given
operation.
"""
@abc.abstractmethod
async def instantiable(self, operation: Operation) -> bool:
"""
Prior to figuring out which operation implementation networks contain
an operation, if none do, they will need to instantiate it on the fly.
"""
@abc.abstractmethod
async def instantiate(
config: BaseConfig,
parent: "BaseInputNetworkContext",
ctx: "BaseInputSetContext",
) -> None:
super().__init__(config, parent)
self.ctx = ctx
@abc.abstractmethod
async def inputs(self, Definition: Definition) -> AsyncIterator[Input]:
"""
Asynchronous iterator of all inputs within a context, which are of a
definition.
"""
class BaseInputNetworkContext(BaseDataFlowObjectContext):
"""
Abstract Base Class for context managing input_set
"""
@abc.abstractmethod
async def add(self, input_set: BaseInputSet):
"""
Adds new input set to the network
"""
@abc.abstractmethod
async def ctx(self) -> BaseInputSetContext:
"""
Returns when a new input set context has entered the network
"""