Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@op(
inputs={"line": calc_string},
outputs={"add": is_add, "mult": is_mult, "numbers": numbers},
)
async def parse_line(line: str):
return {
"add": "add" in line,
"mult": "mult" in line,
"numbers": [int(item) for item in line.split() if item.isdigit()],
}
OPIMPS = opimp_in(sys.modules[__name__])
OPERATIONS = operation_in(sys.modules[__name__])
DATAFLOW = DataFlow.auto(*OPIMPS)
class TestMemoryKeyValueStore(AsyncTestCase):
def setUp(self):
self.kvStore = MemoryKeyValueStore(BaseConfig())
async def test_get_set(self):
async with self.kvStore as kvstore:
async with kvstore() as ctx:
await ctx.set("feed", b"face")
self.assertEqual(await ctx.get("feed"), b"face")
async def test_get_none(self):
async with self.kvStore as kvstore:
async with kvstore() as ctx:
self.assertEqual(await ctx.get("feed"), None)
def test_export(self):
exported = DataFlow.auto(add).export(linked=True)
# Operations
self.assertIn("operations", exported)
self.assertIn("add", exported["operations"])
self.assertIn("inputs", exported["operations"]["add"])
self.assertIn("outputs", exported["operations"]["add"])
self.assertIn("conditions", exported["operations"]["add"])
self.assertIn("is_add", exported["operations"]["add"]["conditions"])
self.assertIn("numbers", exported["operations"]["add"]["inputs"])
self.assertEqual(
"numbers", exported["operations"]["add"]["inputs"]["numbers"]
)
self.assertIn("sum", exported["operations"]["add"]["outputs"])
self.assertEqual(
"result", exported["operations"]["add"]["outputs"]["sum"]
)
# Definitions
def test_resolve_missing_condition_definition(self):
exported = DataFlow.auto(add).export(linked=True)
del exported["definitions"]["is_add"]
with self.assertRaisesRegex(
DefinitionMissing, "add.conditions.*is_add"
):
DataFlow._fromdict(**exported)
def test_resolve_missing_input_output_definition(self):
exported = DataFlow.auto(add).export(linked=True)
del exported["definitions"]["result"]
with self.assertRaisesRegex(DefinitionMissing, "add.outputs.*result"):
DataFlow._fromdict(**exported)
# The GetSingle operation will grab the data we want from the ouputs of our
# operations and present it as the result
from dffml.operation.output import GetSingle
# Import all the operations we wrote
from shouldi.bandit import run_bandit
from shouldi.pypi import pypi_latest_package_version
from shouldi.pypi import pypi_package_json
from shouldi.pypi import pypi_package_url
from shouldi.pypi import pypi_package_contents
from shouldi.pypi import cleanup_pypi_package
from shouldi.safety import safety_check
# Link inputs and outputs together according to their definitions
DATAFLOW = DataFlow.auto(
pypi_package_json,
pypi_latest_package_version,
pypi_package_url,
pypi_package_contents,
cleanup_pypi_package,
safety_check,
run_bandit,
GetSingle,
)
# Seed inputs are added to each executing context. The following Input tells the
# GetSingle output operation that we want the output of the network to include
# data matching the "issues" output of the safety_check operation, and the
# "report" output of the run_bandit operation, for each context.
DATAFLOW.seed.append(
Input(
value=[
async def run(self):
operations = []
for load_operation in self.operations:
if ":" in load_operation:
operations += list(load(load_operation))
else:
operations += [Operation.load(load_operation)]
async with self.config(BaseConfig()) as configloader:
async with configloader() as loader:
dataflow = DataFlow.auto(*operations)
exported = dataflow.export(linked=not self.not_linked)
print((await loader.dumpb(exported)).decode())
error.args = (f"{opimp.op.inputs}: {error.args[0]}",)
raise error
config = {}
extra_config = self.extra_config
for i in range(0, 2):
if "config" in extra_config and len(extra_config["config"]):
extra_config = extra_config["config"]
# TODO(p0) This only goes one level deep. This won't work for
# configs that are multi-leveled
if extra_config:
config = extra_config
dataflow = DataFlow.auto(GetSingle, opimp)
if config:
dataflow.configs[opimp.op.name] = config
# Run the operation in the memory orchestrator
async with MemoryOrchestrator.withconfig({}) as orchestrator:
# Orchestrate the running of these operations
async with orchestrator(dataflow) as octx:
async for ctx, results in octx.run(
[
Input(
value=[
definition.name
for definition in opimp.op.outputs.values()
],
definition=GetSingle.op.inputs["spec"],
),