Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from dffml.util.cli.arg import Arg
from dffml.util.cli.cmd import parse_unknown
@config
class FakeTestingConfig:
num: float
files: List[str]
features: Features
name: str = field("Name of FakeTesting")
label: str = "unlabeled"
readonly: bool = False
source: BaseSource = JSONSource
@base_entry_point("dffml.test", "test")
class BaseTesting(BaseDataFlowFacilitatorObject):
pass # pragma: no cov
@entry_point("fake")
class FakeTesting(BaseTesting):
CONFIG = FakeTestingConfig
class TestAutoArgsConfig(unittest.TestCase):
def test_00_args(self):
self.maxDiff = 99999
self.assertEqual(
FakeTesting.args({}),
{
"""
@abc.abstractmethod
async def get(self, key: str) -> Union[bytes, None]:
"""
Get a value from the key value store
"""
@abc.abstractmethod
async def set(self, name: str, value: bytes):
"""
Get a value in the key value store
"""
@base_entry_point("dffml.kvstore", "kvstore")
class BaseKeyValueStore(BaseDataFlowObject):
"""
Abstract Base Class for key value storage
"""
class BaseContextHandle(abc.ABC):
def __init__(self, ctx: "BaseInputSetContext") -> None:
self.ctx = ctx
self.logger = LOGGER.getChild(self.__class__.__qualname__)
@abc.abstractmethod
def as_string(self) -> str:
pass
lock_network: BaseLockNetwork
opimp_network: BaseOperationImplementationNetwork
rchecker: BaseRedundancyChecker
class BaseOrchestratorContext(BaseDataFlowObjectContext):
@abc.abstractmethod
async def run_operations(
self, strict: bool = True
) -> AsyncIterator[Tuple[BaseContextHandle, Dict[str, Any]]]:
"""
Run all the operations then run cleanup and output operations
"""
@base_entry_point("dffml.orchestrator", "orchestrator")
class BaseOrchestrator(BaseDataFlowObject):
pass # pragma: no cov
async def run(self, inputs: Dict[str, Any]) -> Union[bool, Dict[str, Any]]:
"""
Implementation of the operation goes here. Should take and return a dict
with keys matching the input and output parameters of the Operation
object associated with this operation implementation context.
"""
class FailedToLoadOperationImplementation(Exception):
"""
Raised when an OperationImplementation wasn't found to be registered with
the dffml.operation entrypoint.
"""
@base_entry_point("dffml.operation", "opimp")
class OperationImplementation(BaseDataFlowObject):
def __init__(self, config: "BaseConfig") -> None:
super().__init__(config)
if not getattr(self, "op", False):
raise ValueError(
"OperationImplementation's may not be "
+ "created without an `op`"
)
def __call__(
self, ctx: "BaseInputSetContext", octx: "BaseOrchestratorContext"
) -> OperationImplementationContext:
return self.CONTEXT(self, ctx, octx)
@classmethod
def add_orig_label(cls, *above):
"""
Evaluates the accuracy of our model after training using the input repos
as test data.
"""
raise NotImplementedError()
@abc.abstractmethod
async def predict(self, repos: AsyncIterator[Repo]) -> AsyncIterator[Repo]:
"""
Uses trained data to make a prediction about the quality of a repo.
"""
raise NotImplementedError()
yield (Repo(""), "", 0.0)
@base_entry_point("dffml.model", "model")
class Model(BaseDataFlowFacilitatorObject):
"""
Abstract base class which should be derived from and implmented using
various machine learning frameworks or concepts.
"""
CONFIG = ModelConfig
def __call__(self) -> ModelContext:
# If the config object for this model contains the directory property
# then create it if it does not exist
directory = getattr(self.config, "directory", None)
if directory is not None and not os.path.isdir(directory):
os.makedirs(directory)
return self.CONTEXT(self)
Add operations to the network
"""
@abc.abstractmethod
async def operations(
self, input_set: BaseInputSet = None, stage: Stage = Stage.PROCESSING
) -> AsyncIterator[Operation]:
"""
Retrieve all operations in the network of a given stage filtering by
operations who have inputs with definitions in the input set.
"""
# TODO Make this operate like a BaseInputNetwork were operations can
# be added dynamically
@base_entry_point("dffml.operation.network", "operation", "network")
class BaseOperationNetwork(BaseDataFlowObject):
"""
Operation networks hold Operation objects to allow for looking up of their
inputs, outputs, and conditions.
"""
class BaseRedundancyCheckerConfig(NamedTuple):
key_value_store: BaseKeyValueStore
# TODO store redundancy checks by BaseInputSetContext.handle() and add method
# to remove all associated with a particular handle. Aka allow us to clean up
# the input, redundancy, etc. networks after execution of a context completes
# via the orchestrator.
class BaseRedundancyCheckerContext(BaseDataFlowObjectContext):
Redundancy Checkers ensure that each operation within a context only gets
run with a give permutation of inputs once.
"""
# TODO Provide a way to clear out all locks for inputs within a context
class BaseLockNetworkContext(BaseDataFlowObjectContext):
@abc.abstractmethod
async def acquire(self, parameter_set: BaseParameterSet) -> bool:
"""
An async context manager which will acquire locks of all inputs within
the parameter set.
"""
@base_entry_point("dffml.lock.network", "lock", "network")
class BaseLockNetwork(BaseDataFlowObject):
"""
Acquires locks on inputs which may not be used simultaneously
"""
class BaseOperationImplementationNetworkContext(BaseDataFlowObjectContext):
@abc.abstractmethod
async def contains(self, operation: Operation) -> bool:
"""
Checks if the network contains / has the ability to run a given
operation.
"""
@abc.abstractmethod
async def instantiable(self, operation: Operation) -> bool:
"""
Abstract Base Class for redundancy checking context
"""
@abc.abstractmethod
async def exists(
self, operation: Operation, parameter_set: BaseParameterSet
) -> bool:
pass
@abc.abstractmethod
async def add(self, operation: Operation, parameter_set: BaseParameterSet):
pass
@base_entry_point("dffml.redundancy.checker", "rchecker")
class BaseRedundancyChecker(BaseDataFlowObject):
"""
Redundancy Checkers ensure that each operation within a context only gets
run with a give permutation of inputs once.
"""
# TODO Provide a way to clear out all locks for inputs within a context
class BaseLockNetworkContext(BaseDataFlowObjectContext):
@abc.abstractmethod
async def acquire(self, parameter_set: BaseParameterSet) -> bool:
"""
An async context manager which will acquire locks of all inputs within
the parameter set.
"""
"""
@abc.abstractmethod
async def gather_inputs(
self,
rctx: "BaseRedundancyCheckerContext",
operation: Operation,
ctx: Optional[BaseInputSetContext] = None,
) -> AsyncIterator[BaseParameterSet]:
"""
Generate all possible permutations of applicable inputs for an operation
that, according to the redundancy checker, haven't been run yet.
"""
@base_entry_point("dffml.input.network", "input", "network")
class BaseInputNetwork(BaseDataFlowObject):
"""
Input networks store all of the input data and output data of operations,
which in turn becomes input data to other operations.
"""
class OperationImplementationNotInstantiable(Exception):
"""
OperationImplementation cannot be instantiated and is required to continue.
"""
class OperationImplementationNotInstantiated(Exception):
"""
OperationImplementation is instantiable, but is not has not been
@abc.abstractmethod
async def repos(self) -> AsyncIterator[Repo]:
"""
Returns a list of repos retrieved from self.src
"""
# mypy ignores AsyncIterator[Repo], therefore this is needed
yield Repo("") # pragma: no cover
@abc.abstractmethod
async def repo(self, src_url: str):
"""
Get a repo from the source or add it if it doesn't exist
"""
@base_entry_point("dffml.source", "source")
class BaseSource(BaseDataFlowFacilitatorObject):
"""
Abstract base class for all sources. New sources must be derived from this
class and implement the repos method.
"""
def __call__(self) -> BaseSourceContext:
return self.CONTEXT(self)
class SourcesContext(AsyncContextManagerListContext):
async def update(self, repo: Repo):
"""
Updates a repo for a source
"""
LOGGER.debug("Updating %r: %r", repo.src_url, repo.dict())