Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
raise ValueError(f"target must be coroutine function")
if initializer is not None and asyncio.iscoroutinefunction(initializer):
raise ValueError(f"initializer must be synchronous function")
self.unit = Unit(
target=target or not_implemented,
args=args or (),
kwargs=kwargs or {},
namespace=get_manager().Namespace(),
initializer=initializer,
initargs=initargs,
)
self.aio_process = context.Process(
group=group,
target=process_target or Process.run_async,
args=(self.unit,),
name=name,
daemon=daemon,
)
def run_async(unit: Unit) -> R:
"""Initialize the child process and event loop, then execute the coroutine."""
try:
result: R = Process.run_async(unit)
unit.namespace.result = result
return result
except BaseException as e:
unit.namespace.result = e
raise