Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
for subexc in err.exceptions:
# verify first level actor errors are wrapped as remote
if platform.system() == 'Windows':
# windows is often too slow and cancellation seems
# to happen before an actor is spawned
if isinstance(subexc, trio.Cancelled):
continue
else:
# on windows it seems we can't exactly be sure wtf
# will happen..
assert subexc.type in (
tractor.RemoteActorError,
trio.Cancelled,
trio.MultiError
)
else:
assert isinstance(subexc, tractor.RemoteActorError)
if depth > 0 and subactor_breadth > 1:
# XXX not sure what's up with this..
# on windows sometimes spawning is just too slow and
# we get back the (sent) cancel signal instead
if platform.system() == 'Windows':
assert (subexc.type is trio.MultiError) or (
subexc.type is tractor.RemoteActorError)
else:
assert subexc.type is trio.MultiError
else:
assert (subexc.type is tractor.RemoteActorError) or (
subexc.type is trio.Cancelled)
await nursery.run_in_actor('errorer1', assert_err)
portal2 = await nursery.run_in_actor('errorer2', assert_err)
# get result(s) from main task
try:
await portal2.result()
except tractor.RemoteActorError as err:
assert err.type == AssertionError
print("Look Maa that first actor failed hard, hehh")
raise
# here we should get a `trio.MultiError` containing exceptions
# from both subactors
with pytest.raises(trio.MultiError):
tractor.run(main, arbiter_addr=arb_addr)
rpc_module_paths=exposed_mods,
)
# handle both parameterized cases
if exposed_mods and func_defined:
run()
else:
# underlying errors aren't propagated upwards (yet)
with pytest.raises(remote_err) as err:
run()
# get raw instance from pytest wrapper
value = err.value
# might get multiple `trio.Cancelled`s as well inside an inception
if isinstance(value, trio.MultiError):
value = next(itertools.dropwhile(
lambda exc: not isinstance(exc, tractor.RemoteActorError),
value.exceptions
))
if getattr(value, 'type', None):
assert value.type is inside_err
with trio.CancelScope(shield=True):
await anursery.cancel()
raise
finally:
# No errors were raised while awaiting ".run_in_actor()"
# actors but those actors may have returned remote errors as
# results (meaning they errored remotely and have relayed
# those errors back to this parent actor). The errors are
# collected in ``errors`` so cancel all actors, summarize
# all errors and re-raise.
if errors:
if anursery._children:
with trio.CancelScope(shield=True):
await anursery.cancel()
if len(errors) > 1:
raise trio.MultiError(tuple(errors.values()))
else:
raise list(errors.values())[0]
# ria_nursery scope end
log.debug("Nursery teardown complete")
task_nursery.start_soon(self._handle_run)
self._started.set()
# ***BLOCKING HERE***
# The code flow will block here until the background tasks have
# completed or cancellation occurs.
finally:
# Mark as having stopped
self._stopped.set()
self.logger.debug('%s stopped', self)
# If an error occured, re-raise it here
if self.did_error:
raise trio.MultiError(tuple(
exc_value.with_traceback(exc_tb)
for _, exc_value, exc_tb
in self._errors
))
self.nursery = nursery
self.protocol = ProtocolWrapper(
self.config,
ssl,
client,
server,
self.protocol_send,
partial(spawn_app, nursery, self.app, self.config),
EventWrapper,
alpn_protocol,
)
await self.protocol.initiate()
nursery.start_soon(self.protocol.send_task)
await self._update_keep_alive_timeout()
await self._read_data()
except (trio.MultiError, OSError):
pass
finally:
await self._close()
lib.open_connection = trio_open_connection
lib.sendall = trio_send_all
lib.recv = trio_receive_some
lib.sock_close = trio_close
lib.spawn = trio_spawn
lib.cancel_task_group = _event_loop_wrappers.trio_cancel
lib.unwrap_taskgrouperror = lambda error: error.exceptions
lib.unwrap_result = lambda task: task.result.unwrap()
lib.Lock = trio.Lock
lib.Semaphore = trio.Semaphore
lib.Queue = trio.Queue
lib.Cancelled = trio.Cancelled
lib.Event = trio.Event
lib.TaskTimeout = trio.TooSlowError
lib.TaskGroupError = trio.MultiError
lib.wait_read = _low_level.wait_read_trio
lib.wait_write = _low_level.wait_write_trio
what's left of it.
"""
try:
log.debug(f"Waiting on final result from {actor.uid}")
final = res = await portal.result()
# if it's an async-gen then alert that we're cancelling it
if inspect.isasyncgen(res):
final = []
log.warning(
f"Blindly consuming asyncgen for {actor.uid}")
with trio.fail_after(1):
async with aclosing(res) as agen:
async for item in agen:
log.debug(f"Consuming item {item}")
final.append(item)
except (Exception, trio.MultiError) as err:
# we reraise in the parent task via a ``trio.MultiError``
return err
else:
log.debug(f"Returning final result: {final}")
return final
# back values like an async-generator would but must
# manualy construct the response dict-packet-responses as
# above
with cancel_scope as cs:
task_status.started(cs)
await coro
if not cs.cancelled_caught:
# task was not cancelled so we can instruct the
# far end async gen to tear down
await chan.send({'stop': True, 'cid': cid})
else:
await chan.send({'functype': 'asyncfunction', 'cid': cid})
with cancel_scope as cs:
task_status.started(cs)
await chan.send({'return': await coro, 'cid': cid})
except (Exception, trio.MultiError) as err:
# always ship errors back to caller
log.exception("Actor errored:")
err_msg = pack_error(err)
err_msg['cid'] = cid
try:
await chan.send(err_msg)
except trio.ClosedResourceError:
log.exception(
f"Failed to ship error to caller @ {chan.uid}")
if cs is None:
# error is from above code not from rpc invocation
task_status.started(err)
finally:
# RPC task bookeeping
try:
scope, func, is_complete = actor._rpc_tasks.pop((chan, cid))