Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
await nursery.run_in_actor('errorer1', assert_err)
portal2 = await nursery.run_in_actor('errorer2', assert_err)
# get result(s) from main task
try:
await portal2.result()
except tractor.RemoteActorError as err:
assert err.type == AssertionError
print("Look Maa that first actor failed hard, hehh")
raise
# here we should get a `trio.MultiError` containing exceptions
# from both subactors
with pytest.raises(trio.MultiError):
tractor.run(main, arbiter_addr=arb_addr)
if pub_actor is 'arbiter':
assert 'even' not in get_topics()
await odd_portal.cancel_actor()
await trio.sleep(1)
if pub_actor is 'arbiter':
while get_topics():
await trio.sleep(0.1)
if time.time() - start > 1:
pytest.fail("odds subscription never dropped?")
else:
await master_portal.cancel_actor()
tractor.run(
main,
arbiter_addr=arb_addr,
rpc_module_paths=[__name__],
)
def time_quad_ex(arb_addr, travis, spawn_backend):
if travis and spawn_backend == 'mp' and (platform.system() != 'Windows'):
# no idea, but the travis, mp, linux runs are flaking out here often
pytest.skip("Test is too flaky on mp in CI")
timeout = 7 if platform.system() == 'Windows' else 4
start = time.time()
results = tractor.run(cancel_after, timeout, arbiter_addr=arb_addr)
diff = time.time() - start
assert results
return results, diff
def run():
tractor.run(
main,
arbiter_addr=arb_addr,
rpc_module_paths=exposed_mods,
)
def test_local_actor_async_func(arb_addr):
"""Verify a simple async function in-process.
"""
nums = []
async def print_loop():
# arbiter is started in-proc if dne
assert tractor.current_actor().is_arbiter
for i in range(10):
nums.append(i)
await trio.sleep(0.1)
start = time.time()
tractor.run(print_loop, arbiter_addr=arb_addr)
# ensure the sleeps were actually awaited
assert time.time() - start >= 1
assert nums == list(range(10))
def test_multierror_fast_nursery(arb_addr, start_method, num_subactors, delay):
"""Verify we raise a ``trio.MultiError`` out of a nursery where
more then one actor errors and also with a delay before failure
to test failure during an ongoing spawning.
"""
async def main():
async with tractor.open_nursery() as nursery:
for i in range(num_subactors):
await nursery.run_in_actor(
f'errorer{i}', assert_err, delay=delay)
with pytest.raises(trio.MultiError) as exc_info:
tractor.run(main, arbiter_addr=arb_addr)
assert exc_info.type == tractor.MultiError
err = exc_info.value
assert len(err.exceptions) == num_subactors
for exc in err.exceptions:
assert isinstance(exc, tractor.RemoteActorError)
assert exc.type == AssertionError
def test_stream_from_single_subactor(arb_addr, start_method, stream_func):
"""Verify streaming from a spawned async generator.
"""
tractor.run(
partial(
stream_from_single_subactor,
stream_func_name=stream_func,
),
arbiter_addr=arb_addr,
start_method=start_method,
)
def test_not_fast_enough_quad(
arb_addr, time_quad_ex, cancel_delay, travis, spawn_backend
):
"""Verify we can cancel midway through the quad example and all actors
cancel gracefully.
"""
results, diff = time_quad_ex
delay = max(diff - cancel_delay, 0)
results = tractor.run(cancel_after, delay, arbiter_addr=arb_addr)
if platform.system() == 'Windows' and results is not None:
# In Windows CI it seems later runs are quicker then the first
# so just ignore these
print("Woa there windows caught your breath eh?")
else:
# should be cancelled mid-streaming
assert results is None
say_hello,
# arguments are always named
other_actor='gretchen',
)
gretchen = await n.run_in_actor(
'gretchen',
say_hello,
other_actor='donny',
)
print(await gretchen.result())
print(await donny.result())
print("CUTTTT CUUTT CUT!!! Donny!! You're supposed to say...")
if __name__ == '__main__':
tractor.run(main)