Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_resolve_missing_input_output_definition(self):
exported = DataFlow.auto(add).export(linked=True)
del exported["definitions"]["result"]
with self.assertRaisesRegex(DefinitionMissing, "add.outputs.*result"):
DataFlow._fromdict(**exported)
def test_resolve_missing_condition_definition(self):
exported = DataFlow.auto(add).export(linked=True)
del exported["definitions"]["is_add"]
with self.assertRaisesRegex(
DefinitionMissing, "add.conditions.*is_add"
):
DataFlow._fromdict(**exported)
async def run(self):
# The merged dataflow
merged: Dict[str, Any] = {}
# For entering ConfigLoader contexts
async with contextlib.AsyncExitStack() as exit_stack:
# Load config loaders we'll need as we see their file types
parsers: Dict[str, BaseConfigLoader] = {}
for path in self.dataflows:
_, exported = await BaseConfigLoader.load_file(
parsers, exit_stack, path
)
merge(merged, exported)
# Export the dataflow
dataflow = DataFlow._fromdict(**merged)
async with self.config(BaseConfig()) as configloader:
async with configloader() as loader:
exported = dataflow.export(linked=not self.not_linked)
print((await loader.dumpb(exported)).decode())
async def run(self):
dataflow_path = pathlib.Path(self.dataflow)
config_cls = self.config
if config_cls is None:
config_type = dataflow_path.suffix.replace(".", "")
config_cls = BaseConfigLoader.load(config_type)
async with config_cls.withconfig(self.extra_config) as configloader:
async with configloader() as loader:
exported = await loader.loadb(dataflow_path.read_bytes())
dataflow = DataFlow._fromdict(**exported)
print(f"graph {self.display}")
for stage in Stage:
# Skip stage if not wanted
if self.stages and stage.value not in self.stages:
continue
stage_node = hashlib.md5(
("stage." + stage.value).encode()
).hexdigest()
if len(self.stages) != 1:
print(f"subgraph {stage_node}[{stage.value.title()} Stage]")
print(f"style {stage_node} fill:#afd388b5,stroke:#a4ca7a")
for instance_name, operation in dataflow.operations.items():
if operation.stage != stage:
continue
subgraph_node = hashlib.md5(
("subgraph." + instance_name).encode()