Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"""
Implementation of the operation goes here. Should take and return a dict
with keys matching the input and output parameters of the Operation
object associated with this operation implementation context.
"""
class FailedToLoadOperationImplementation(Exception):
"""
Raised when an OperationImplementation wasn't found to be registered with
the dffml.operation entrypoint.
"""
@base_entry_point("dffml.operation", "opimp")
class OperationImplementation(BaseDataFlowObject):
def __init__(self, config: "BaseConfig") -> None:
super().__init__(config)
if not getattr(self, "op", False):
raise ValueError(
"OperationImplementation's may not be "
+ "created without an `op`"
)
def __call__(
self, ctx: "BaseInputSetContext", octx: "BaseOrchestratorContext"
) -> OperationImplementationContext:
return self.CONTEXT(self, ctx, octx)
@classmethod
def add_orig_label(cls, *above):
return list(above) + cls.op.name.split("_")
"""
@abc.abstractmethod
async def operations(
self, input_set: BaseInputSet = None, stage: Stage = Stage.PROCESSING
) -> AsyncIterator[Operation]:
"""
Retrieve all operations in the network of a given stage filtering by
operations who have inputs with definitions in the input set.
"""
# TODO Make this operate like a BaseInputNetwork were operations can
# be added dynamically
@base_entry_point("dffml.operation.network", "operation", "network")
class BaseOperationNetwork(BaseDataFlowObject):
"""
Operation networks hold Operation objects to allow for looking up of their
inputs, outputs, and conditions.
"""
class BaseRedundancyCheckerConfig(NamedTuple):
key_value_store: BaseKeyValueStore
# TODO store redundancy checks by BaseInputSetContext.handle() and add method
# to remove all associated with a particular handle. Aka allow us to clean up
# the input, redundancy, etc. networks after execution of a context completes
# via the orchestrator.
class BaseRedundancyCheckerContext(BaseDataFlowObjectContext):
"""
def config(cls, config, *above) -> BaseConfig:
if hasattr(cls, "CONFIG"):
return super(BaseDataFlowObject, cls).config(config, *above)
return BaseConfig()
run with a give permutation of inputs once.
"""
# TODO Provide a way to clear out all locks for inputs within a context
class BaseLockNetworkContext(BaseDataFlowObjectContext):
@abc.abstractmethod
async def acquire(self, parameter_set: BaseParameterSet) -> bool:
"""
An async context manager which will acquire locks of all inputs within
the parameter set.
"""
@base_entry_point("dffml.lock.network", "lock", "network")
class BaseLockNetwork(BaseDataFlowObject):
"""
Acquires locks on inputs which may not be used simultaneously
"""
class BaseOperationImplementationNetworkContext(BaseDataFlowObjectContext):
@abc.abstractmethod
async def contains(self, operation: Operation) -> bool:
"""
Checks if the network contains / has the ability to run a given
operation.
"""
@abc.abstractmethod
async def instantiable(self, operation: Operation) -> bool:
"""
from ..util.entrypoint import entry_point, EntrypointNotFound
from ..util.cli.arg import Arg
from ..util.cli.cmd import CMD
from ..util.data import ignore_args, traverse_get
from ..util.asynchelper import context_stacker, aenter_stack, concurrently
from .log import LOGGER
class MemoryDataFlowObjectContextConfig(NamedTuple):
# Unique ID of the context, in other implementations this might be a JWT or
# something
uid: str
class BaseMemoryDataFlowObject(BaseDataFlowObject):
def __call__(self) -> BaseDataFlowObjectContext:
return self.CONTEXT(
MemoryDataFlowObjectContextConfig(uid=secrets.token_hex()), self
)
class MemoryKeyValueStoreContext(BaseKeyValueStoreContext):
def __init__(
self, config: BaseConfig, parent: "MemoryKeyValueStore"
) -> None:
super().__init__(config, parent)
self.memory: Dict[str, bytes] = {}
self.lock = asyncio.Lock()
async def get(self, key: str) -> Union[bytes, None]:
async with self.lock:
Abstract Base Class for redundancy checking context
"""
@abc.abstractmethod
async def exists(
self, operation: Operation, parameter_set: BaseParameterSet
) -> bool:
pass
@abc.abstractmethod
async def add(self, operation: Operation, parameter_set: BaseParameterSet):
pass
@base_entry_point("dffml.redundancy.checker", "rchecker")
class BaseRedundancyChecker(BaseDataFlowObject):
"""
Redundancy Checkers ensure that each operation within a context only gets
run with a give permutation of inputs once.
"""
# TODO Provide a way to clear out all locks for inputs within a context
class BaseLockNetworkContext(BaseDataFlowObjectContext):
@abc.abstractmethod
async def acquire(self, parameter_set: BaseParameterSet) -> bool:
"""
An async context manager which will acquire locks of all inputs within
the parameter set.
"""
stage: Stage = Stage.PROCESSING,
) -> AsyncIterator[Tuple[Operation, BaseParameterSet]]:
"""
Use new_input_set to determine which operations in the network might be
up for running. Cross check using existing inputs to generate per
input set context novel input pairings. Yield novel input pairings
along with their operations as they are generated.
"""
# TODO We should be able to specify multiple operation implementation networks.
# This would enable operations to live in different place, accessed via the
# orchestrator transparently. This will probably invlove
# dffml.util.asynchelper.AsyncContextManagerList
@base_entry_point("dffml.operation.implementation.network", "opimp", "network")
class BaseOperationImplementationNetwork(BaseDataFlowObject):
"""
Knows where operations are or if they can be made
"""
class BaseOrchestratorConfig(BaseConfig, NamedTuple):
input_network: BaseInputNetwork
operation_network: BaseOperationNetwork
lock_network: BaseLockNetwork
opimp_network: BaseOperationImplementationNetwork
rchecker: BaseRedundancyChecker
class BaseOrchestratorContext(BaseDataFlowObjectContext):
@abc.abstractmethod
async def run_operations(
opimp_network: BaseOperationImplementationNetwork
rchecker: BaseRedundancyChecker
class BaseOrchestratorContext(BaseDataFlowObjectContext):
@abc.abstractmethod
async def run_operations(
self, strict: bool = True
) -> AsyncIterator[Tuple[BaseContextHandle, Dict[str, Any]]]:
"""
Run all the operations then run cleanup and output operations
"""
@base_entry_point("dffml.orchestrator", "orchestrator")
class BaseOrchestrator(BaseDataFlowObject):
pass # pragma: no cov
@abc.abstractmethod
async def gather_inputs(
self,
rctx: "BaseRedundancyCheckerContext",
operation: Operation,
ctx: Optional[BaseInputSetContext] = None,
) -> AsyncIterator[BaseParameterSet]:
"""
Generate all possible permutations of applicable inputs for an operation
that, according to the redundancy checker, haven't been run yet.
"""
@base_entry_point("dffml.input.network", "input", "network")
class BaseInputNetwork(BaseDataFlowObject):
"""
Input networks store all of the input data and output data of operations,
which in turn becomes input data to other operations.
"""
class OperationImplementationNotInstantiable(Exception):
"""
OperationImplementation cannot be instantiated and is required to continue.
"""
class OperationImplementationNotInstantiated(Exception):
"""
OperationImplementation is instantiable, but is not has not been
instantiated within the network and was required to continue.
@abc.abstractmethod
async def get(self, key: str) -> Union[bytes, None]:
"""
Get a value from the key value store
"""
@abc.abstractmethod
async def set(self, name: str, value: bytes):
"""
Get a value in the key value store
"""
@base_entry_point("dffml.kvstore", "kvstore")
class BaseKeyValueStore(BaseDataFlowObject):
"""
Abstract Base Class for key value storage
"""
class BaseContextHandle(abc.ABC):
def __init__(self, ctx: "BaseInputSetContext") -> None:
self.ctx = ctx
self.logger = LOGGER.getChild(self.__class__.__qualname__)
@abc.abstractmethod
def as_string(self) -> str:
pass
class BaseInputSetContext(abc.ABC):