Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_required_args(callwith_expecterror):
func, kwargs, err = callwith_expecterror
if err is not None:
with pytest.raises(err):
await func(**kwargs)
else:
async with tractor.open_nursery() as n:
# await func(**kwargs)
portal = await n.run_in_actor(
'sub', multilock_pubber, **kwargs)
async for val in await portal.result():
assert val == {'doggy': 10}
async def main():
async with tractor.open_nursery() as nursery:
for i in range(num_subactors):
await nursery.run_in_actor(
f'errorer{i}', assert_err, delay=delay)
async def main():
actor = tractor.current_actor()
assert actor.is_arbiter
# spawn a subactor which calls us back
async with tractor.open_nursery() as n:
await n.run_in_actor(
'subactor',
sleep_back_actor,
actor_name=subactor_requests_to,
# function from the local exposed module space
# the subactor will invoke when it RPCs back to this actor
func_name=funcname,
exposed_mods=exposed_mods,
func_defined=True if func_defined else False,
rpc_module_paths=subactor_exposed_mods,
)
async def test_trynamic_trio(func, start_method):
"""Main tractor entry point, the "master" process (for now
acts as the "director").
"""
async with tractor.open_nursery() as n:
print("Alright... Action!")
donny = await n.run_in_actor(
'donny',
func,
other_actor='gretchen',
)
gretchen = await n.run_in_actor(
'gretchen',
func,
other_actor='donny',
)
print(await gretchen.result())
print(await donny.result())
print("CUTTTT CUUTT CUT!!?! Donny!! You're supposed to say...")
async def spawn_and_sleep_forever(task_status=trio.TASK_STATUS_IGNORED):
async with tractor.open_nursery() as tn:
for i in range(3):
await tn.run_in_actor('sucka', sleep_forever)
task_status.started()
await trio.sleep_forever()
async def main():
assert not tractor.current_actor().is_arbiter
async with tractor.open_nursery() as n:
p1 = await n.start_actor('doggy')
p2 = await n.start_actor('doggy')
async with tractor.wait_for_actor('doggy') as portal:
assert portal.channel.uid in (p2.channel.uid, p1.channel.uid)
await n.cancel()
async def test_no_arbitter():
"""An arbitter must be established before any nurseries
can be created.
(In other words ``tractor.run`` must be used instead of ``trio.run`` as is
done by the ``pytest-trio`` plugin.)
"""
with pytest.raises(RuntimeError):
with tractor.open_nursery():
pass
async def main():
ss = tractor.current_actor().statespace
async with tractor.open_nursery() as n:
name = 'arbiter'
if pub_actor is 'streamer':
# start the publisher as a daemon
master_portal = await n.start_actor(
'streamer',
rpc_module_paths=[__name__],
)
even_portal = await n.run_in_actor(
'evens', subs, which=['even'], pub_actor_name=name)
odd_portal = await n.run_in_actor(
'odds', subs, which=['odd'], pub_actor_name=name)
async with tractor.wait_for_actor('evens'):
async def main():
async with tractor.open_nursery() as nursery:
portal = await nursery.run_in_actor('errorer', assert_err, **args)
# get result(s) from main task
try:
await portal.result()
except tractor.RemoteActorError as err:
assert err.type == errtype
print("Look Maa that actor failed hard, hehh")
raise
async def main():
"""Main tractor entry point, the "master" process (for now
acts as the "director").
"""
async with tractor.open_nursery() as n:
print("Alright... Action!")
donny = await n.run_in_actor(
'donny',
say_hello,
# arguments are always named
other_actor='gretchen',
)
gretchen = await n.run_in_actor(
'gretchen',
say_hello,
other_actor='donny',
)
print(await gretchen.result())
print(await donny.result())
print("CUTTTT CUUTT CUT!!! Donny!! You're supposed to say...")