How to use the tractor.run function in tractor

To help you get started, we’ve selected a few tractor examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github goodboy / tractor / tests / test_cancellation.py View on Github external
await nursery.run_in_actor('errorer1', assert_err)
            portal2 = await nursery.run_in_actor('errorer2', assert_err)

            # get result(s) from main task
            try:
                await portal2.result()
            except tractor.RemoteActorError as err:
                assert err.type == AssertionError
                print("Look Maa that first actor failed hard, hehh")
                raise

        # here we should get a `trio.MultiError` containing exceptions
        # from both subactors

    with pytest.raises(trio.MultiError):
        tractor.run(main, arbiter_addr=arb_addr)
github goodboy / tractor / tests / test_pubsub.py View on Github external
if pub_actor is 'arbiter':
                assert 'even' not in get_topics()

            await odd_portal.cancel_actor()
            await trio.sleep(1)

            if pub_actor is 'arbiter':
                while get_topics():
                    await trio.sleep(0.1)
                    if time.time() - start > 1:
                        pytest.fail("odds subscription never dropped?")
            else:
                await master_portal.cancel_actor()

    tractor.run(
        main,
        arbiter_addr=arb_addr,
        rpc_module_paths=[__name__],
    )
github goodboy / tractor / tests / test_streaming.py View on Github external
def time_quad_ex(arb_addr, travis, spawn_backend):
    if travis and spawn_backend == 'mp' and (platform.system() != 'Windows'):
        # no idea, but the travis, mp, linux runs are flaking out here often
        pytest.skip("Test is too flaky on mp in CI")

    timeout = 7 if platform.system() == 'Windows' else 4
    start = time.time()
    results = tractor.run(cancel_after, timeout, arbiter_addr=arb_addr)
    diff = time.time() - start
    assert results
    return results, diff
github goodboy / tractor / tests / test_rpc.py View on Github external
def run():
        tractor.run(
            main,
            arbiter_addr=arb_addr,
            rpc_module_paths=exposed_mods,
        )
github goodboy / tractor / tests / test_local.py View on Github external
def test_local_actor_async_func(arb_addr):
    """Verify a simple async function in-process.
    """
    nums = []

    async def print_loop():
        # arbiter is started in-proc if dne
        assert tractor.current_actor().is_arbiter

        for i in range(10):
            nums.append(i)
            await trio.sleep(0.1)

    start = time.time()
    tractor.run(print_loop, arbiter_addr=arb_addr)

    # ensure the sleeps were actually awaited
    assert time.time() - start >= 1
    assert nums == list(range(10))
github goodboy / tractor / tests / test_cancellation.py View on Github external
def test_multierror_fast_nursery(arb_addr, start_method, num_subactors, delay):
    """Verify we raise a ``trio.MultiError`` out of a nursery where
    more then one actor errors and also with a delay before failure
    to test failure during an ongoing spawning.
    """
    async def main():
        async with tractor.open_nursery() as nursery:
            for i in range(num_subactors):
                await nursery.run_in_actor(
                    f'errorer{i}', assert_err, delay=delay)

    with pytest.raises(trio.MultiError) as exc_info:
        tractor.run(main, arbiter_addr=arb_addr)

    assert exc_info.type == tractor.MultiError
    err = exc_info.value
    assert len(err.exceptions) == num_subactors
    for exc in err.exceptions:
        assert isinstance(exc, tractor.RemoteActorError)
        assert exc.type == AssertionError
github goodboy / tractor / tests / test_streaming.py View on Github external
def test_stream_from_single_subactor(arb_addr, start_method, stream_func):
    """Verify streaming from a spawned async generator.
    """
    tractor.run(
        partial(
            stream_from_single_subactor,
            stream_func_name=stream_func,
        ),
        arbiter_addr=arb_addr,
        start_method=start_method,
    )
github goodboy / tractor / tests / test_streaming.py View on Github external
def test_not_fast_enough_quad(
    arb_addr, time_quad_ex, cancel_delay, travis, spawn_backend
):
    """Verify we can cancel midway through the quad example and all actors
    cancel gracefully.
    """
    results, diff = time_quad_ex
    delay = max(diff - cancel_delay, 0)
    results = tractor.run(cancel_after, delay, arbiter_addr=arb_addr)
    if platform.system() == 'Windows' and results is not None:
        # In Windows CI it seems later runs are quicker then the first
        # so just ignore these
        print("Woa there windows caught your breath eh?")
    else:
        # should be cancelled mid-streaming
        assert results is None
github goodboy / tractor / examples / a_trynamic_first_scene.py View on Github external
say_hello,
            # arguments are always named
            other_actor='gretchen',
        )
        gretchen = await n.run_in_actor(
            'gretchen',
            say_hello,
            other_actor='donny',
        )
        print(await gretchen.result())
        print(await donny.result())
        print("CUTTTT CUUTT CUT!!! Donny!! You're supposed to say...")


if __name__ == '__main__':
    tractor.run(main)