Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
),
Input(
value=[add.op.outputs["sum"].name],
definition=GetSingle.op.inputs["spec"],
),
],
)
)
for to_calc in calc_strings_check.keys()
],
"uctx": [
[
Input(
value=to_calc, definition=parse_line.op.inputs["line"]
),
Input(
value=[add.op.outputs["sum"].name],
definition=GetSingle.op.inputs["spec"],
),
]
for to_calc in calc_strings_check.keys()
],
}
async with MemoryOrchestrator.withconfig({}) as orchestrator:
async with orchestrator(DATAFLOW) as octx:
for callstyle, inputs in callstyles.items():
with self.subTest(callstyle=callstyle):
if callstyle in callstyles_no_expand:
run_coro = octx.run(inputs)
else:
run_coro = octx.run(*inputs)
async for ctx, results in run_coro:
async def run_op(self, name, opimp):
# Create an instance of BaseConfigurable and have it parse the inputs
# from self.extra_config. Use the op.inputs to know what we should pass
# to config_get
inputs = []
for name, definition in opimp.op.inputs.items():
try:
inputs.append(
Input(
value=self.config_get(opimp.op, name, definition),
definition=definition,
)
)
except MissingConfig as error:
error.args = (f"{opimp.op.inputs}: {error.args[0]}",)
raise error
config = {}
extra_config = self.extra_config
for i in range(0, 2):
if "config" in extra_config and len(extra_config["config"]):
extra_config = extra_config["config"]
# TODO(p0) This only goes one level deep. This won't work for
DATAFLOW = DataFlow.auto(
pypi_package_json,
pypi_latest_package_version,
pypi_package_url,
pypi_package_contents,
cleanup_pypi_package,
safety_check,
run_bandit,
GetSingle,
)
# Seed inputs are added to each executing context. The following Input tells the
# GetSingle output operation that we want the output of the network to include
# data matching the "issues" output of the safety_check operation, and the
# "report" output of the run_bandit operation, for each context.
DATAFLOW.seed.append(
Input(
value=[
safety_check.op.outputs["issues"].name,
run_bandit.op.outputs["report"].name,
],
definition=GetSingle.op.inputs["spec"],
)
)
class Install(CMD):
arg_packages = Arg(
"packages", nargs="+", help="Package to check if we should install"
)
async def run(self):
inputs = []
async for repo in self.repos(sctx):
# Skip running DataFlow if repo already has features
existing_features = repo.features()
if self.caching and all(
map(
lambda cached: cached in existing_features,
self.caching,
)
):
continue
repo_inputs = []
for value, def_name in self.inputs:
repo_inputs.append(
Input(
value=value,
definition=dataflow.definitions[def_name],
)
)
if self.repo_def:
repo_inputs.append(
Input(
value=repo.src_url,
definition=dataflow.definitions[self.repo_def],
)
)
# TODO(p1) When OrchestratorContext is fixed to accept an
# asyncgenerator we won't have to build a list
inputs.append(
MemoryInputSet(
# TODO(p0) This only goes one level deep. This won't work for
# configs that are multi-leveled
if extra_config:
config = extra_config
dataflow = DataFlow.auto(GetSingle, opimp)
if config:
dataflow.configs[opimp.op.name] = config
# Run the operation in the memory orchestrator
async with MemoryOrchestrator.withconfig({}) as orchestrator:
# Orchestrate the running of these operations
async with orchestrator(dataflow) as octx:
async for ctx, results in octx.run(
[
Input(
value=[
definition.name
for definition in opimp.op.outputs.values()
],
definition=GetSingle.op.inputs["spec"],
),
*inputs,
]
):
return results
if (
not input_data["definition"]
in config.dataflow.definitions
):
return web.json_response(
{
"error": f"Missing definition for {input_data['definition']} in dataflow"
},
status=HTTPStatus.NOT_FOUND,
)
inputs.append(
MemoryInputSet(
MemoryInputSetConfig(
ctx=StringInputSetContext(ctx),
inputs=[
Input(
value=input_data["value"],
definition=config.dataflow.definitions[
input_data["definition"]
],
)
for input_data in client_inputs
],
)
)
)
# Run the operation in an orchestrator
# TODO(dfass) Create the orchestrator on startup of the HTTP API itself
async with MemoryOrchestrator.basic_config() as orchestrator:
# TODO(dfass) Create octx on dataflow registration
async with orchestrator(config.dataflow) as octx:
results = {
async def inputs(self) -> AsyncIterator[Input]:
pass
def _fromdict(cls, *, linked: bool = False, **kwargs):
# Import all operations
if linked:
kwargs.update(cls._resolve(kwargs))
del kwargs["definitions"]
kwargs["operations"] = {
instance_name: Operation._fromdict(
instance_name=instance_name, **operation
)
for instance_name, operation in kwargs["operations"].items()
}
# Import seed inputs
kwargs["seed"] = [
Input._fromdict(**input_data) for input_data in kwargs["seed"]
]
# Import input flows
kwargs["flow"] = {
instance_name: InputFlow._fromdict(**input_flow)
for instance_name, input_flow in kwargs["flow"].items()
}
return cls(**kwargs)
"""
def __init__(self) -> None:
# TODO audit use of memory (should be used sparingly)
self.lock = asyncio.Lock()
self.event_added = asyncio.Event()
self.event_added_lock = asyncio.Lock()
self.notification_items = []
def __call__(self) -> NotificationSetContext:
return NotificationSetContext(self)
class MemoryInputNetworkContextEntry(NamedTuple):
ctx: BaseInputSetContext
definitions: Dict[Definition, List[Input]]
by_origin: Dict[Union[str, Tuple[str, str]], List[Input]]
class MemoryDefinitionSetContext(BaseDefinitionSetContext):
async def inputs(self, definition: Definition) -> AsyncIterator[Input]:
# Grab the input set context handle
handle = await self.ctx.handle()
handle_string = handle.as_string()
# Associate inputs with their context handle grouped by definition
async with self.parent.ctxhd_lock:
# Yield all items under the context for the given definition
entry = self.parent.ctxhd[handle_string]
for item in entry.definitions[definition]:
yield item
async with MemoryOrchestrator.withconfig({}) as orchestrator:
# Create a orchestrator context, everything in DFFML follows this
# one-two context entry pattern
async with orchestrator(DATAFLOW) as octx:
# Run all the operations, Each iteration of this loop happens
# when all inputs are exhausted for a context, the output
# operations are then run and their results are yielded
async for package_name, results in octx.run(
{
# For each package add a new input set to the input network
# The context operations execute under is the package name
# to evaluate. Contexts ensure that data pertaining to
# package A doesn't mingle with data pertaining to package B
package_name: [
# The only input to the operations is the package name.
Input(
value=package_name,
definition=pypi_package_json.op.inputs[
"package"
],
)
]
for package_name in self.packages
}
):
# Grab the number of safety issues and the bandit report
# from the results dict
safety_issues = results[
safety_check.op.outputs["issues"].name
]
bandit_report = results[
run_bandit.op.outputs["report"].name