Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@meta.cpubound
def cpufunc():
return 1
async def spawn(corofunc, *args, daemon=False):
'''
Create a new task, running corofunc(*args). Use the daemon=True
option if the task runs forever as a background task.
'''
coro = meta.instantiate_coroutine(corofunc, *args)
task = await _spawn(coro)
task.daemon = daemon
return task
async def _timeout_after_func(clock, absolute, coro, args, ignore=False, timeout_result=None):
coro = meta.instantiate_coroutine(coro, *args)
async with _TimeoutAfter(clock, absolute, ignore=ignore, timeout_result=timeout_result):
return await coro
def spawn_thread(func, *args, daemon=False):
'''
Launch an async thread. This mimicks the way a task is normally spawned. For
example:
t = await spawn_thread(func, arg1, arg2)
...
await t.join()
'''
if iscoroutine(func) or meta.iscoroutinefunction(func):
raise TypeError("spawn_thread() can't be used on coroutines")
async def runner(args, daemon):
t = AsyncThread(func, args=args, daemon=daemon)
await t.start()
return t
return runner(args, daemon)
def run(self, corofunc=None, *args, shutdown=False):
if self._shutdown_funcs is None:
raise RuntimeError("Can't run a kernel that's been shut down or crashed. Create a new kernel.")
coro = meta.instantiate_coroutine(corofunc, *args) if corofunc else None
with meta.running():
# Make the kernel runtime environment (if needed)
if not self._runner:
self._runner = self._make_kernel_runtime()
ret_val = ret_exc = None
# Run the supplied coroutine (if any)
if coro or not shutdown:
task = self._runner(coro)
if task:
ret_exc = task.exception
ret_val = task.result if not ret_exc else None
del task
# If shutdown has been requested, run the shutdown process
def async_thread(func=None, *, daemon=False):
'''
Decorator that is used to mark a callable as running in an asynchronous thread
'''
if func is None:
return lambda func: async_thread(func, daemon=daemon)
if meta.iscoroutinefunction(func):
raise TypeError("async_thread can't be applied to coroutines.")
@wraps(func)
def wrapper(*args, **kwargs):
if meta._from_coroutine() and not is_async_thread():
async def runner(*args, **kwargs):
# Launching async threads could result in a situation where
# synchronous code gets executed, but there is no opportunity
# for Curio to properly check for cancellation. This next
# call is a sanity check--if there's pending cancellation, don't
# even bother to launch the associated thread.
await check_cancellation()
t = AsyncThread(func, args=args, kwargs=kwargs, daemon=daemon)
await t.start()
try:
return await t.join()