Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_descheduled_time_instrument():
time_fn = Mock(side_effect=[5, 10, 10, 20])
instrument = _trio._DescheduledTimeInstrument(time_fn=time_fn)
trio_lowlevel.add_instrument(instrument)
# Only tasks referenced by get_elapsed_descheduled_time() will be tracked,
# so instrument is not tracking the current task.
await trio.sleep(0)
assert not time_fn.called
async with trio.open_nursery() as nursery:
@nursery.start_soon
async def _tracked_child():
# calling get_elapsed_descheduled_time() initiates tracking
task = trio_lowlevel.current_task()
assert instrument.get_elapsed_descheduled_time(task) == 0
await trio.sleep(0)
assert instrument.get_elapsed_descheduled_time(task) == 10 - 5
await trio.sleep(0)
assert instrument.get_elapsed_descheduled_time(task) == 20 - 5
# time function is called twice for each deschedule
assert time_fn.call_count == 4
# the sole tracked task exited, so instrument is automatically removed
with pytest.raises(KeyError):
trio_lowlevel.remove_instrument(instrument)
async def test_multiple_contexts():
async def recv_and_send(ctx):
data = await ctx.arecv()
await trio.sleep(0.05)
await ctx.asend(data)
with pynng.Rep0(listen=addr, recv_timeout=500) as rep, \
pynng.Req0(dial=addr, recv_timeout=500) as req1, \
pynng.Req0(dial=addr, recv_timeout=500) as req2:
async with trio.open_nursery() as n:
ctx1, ctx2 = [rep.new_context() for _ in range(2)]
with ctx1, ctx2:
n.start_soon(recv_and_send, ctx1)
n.start_soon(recv_and_send, ctx2)
await req1.asend(b'oh hi')
await req2.asend(b'me toooo')
assert (await req1.arecv() == b'oh hi')
assert (await req2.arecv() == b'me toooo')
await prev_done.wait()
await send_result.send(result)
self_done.set()
async def consume_input(nursery) -> None:
prev_done = trio.Event()
prev_done.set()
async for item in iterable:
self_done = trio.Event()
nursery.start_soon(wrapper, prev_done, self_done, item)
prev_done = self_done
await prev_done.wait()
await send_result.aclose()
async with trio.open_nursery() as nursery:
nursery.start_soon(consume_input, nursery)
yield receive_result
nursery.cancel_scope.cancel()
async def async_query():
db_config = config['database']
kwargs = {
'host': db_config['host'],
'port': db_config['port'],
'db': db_config['db'],
'user': db_config['user'],
'password': db_config['password'],
}
if super_user:
kwargs['user'] = db_config['super_user']
kwargs['password'] = db_config['super_password']
async with trio.open_nursery() as nursery:
kwargs['nursery'] = nursery
connect_db = functools.partial(r.connect, **kwargs)
conn = await connect_db()
try:
result = await query.run(conn)
finally:
await conn.close()
return result
async def init_db(db_config):
'''
Make sure the database and required objects (users, tables, indices) all
exist.
'''
logger.info('Connecting to RethinkDB: {}'.format(db_config['host']))
async with trio.open_nursery() as nursery:
conn = await connect_db(db_config, nursery)
try:
await r.db_drop('test').run(conn)
except r.ReqlRuntimeError:
pass # Already deleted
db_name = db_config['db']
await ensure_db(conn, db_name)
await ensure_db_user(conn, db_name, db_config['user'],
db_config['password'])
await ensure_db_table(conn, 'captcha_solver')
await ensure_db_table(conn, 'domain_login', primary_key='domain')
await ensure_db_table(conn, 'frontier')
await ensure_db_index(conn, 'frontier', 'cost_index',
[r.row['job_id'], r.row['in_flight'], r.row['cost']])
await ensure_db_table(conn, 'job')
async def __start_service(self):
logger.info('Initiating scraper service.')
try:
# Blocks until all child tasks finish or exception thrown
async with trio.open_nursery() as nursery:
nursery.start_soon(self.__queue_manager, nursery)
except KeyboardInterrupt:
print("\nCaught KeyboardInterrupt: stopping service.")
logger.info(f'Successfully scraped {self.successful_scrapes} cases.')
logger.info("Scraper service stopped.")
async def run(self):
'''
Run the downloader, including all concurrent download tasks. When
cancelled, all download tasks are also cancelled.
:returns: Runs until cancelled.
'''
async with trio.open_nursery() as nursery, \
trio_asyncio.open_loop():
async for request in self._recv_channel:
await self._semaphore.acquire()
self._count += 1
nursery.start_soon(self._download, request)
async def main():
db_config = get_config()['database']
async with trio.open_nursery() as nursery:
conn = await r.connect(
host=db_config['host'],
port=db_config['port'],
db=db_config['db'],
user=db_config['user'],
password=db_config['password'],
nursery=nursery
)
await clear(conn, 'captcha_solver')
await clear(conn, 'domain_login')
await clear(conn, 'frontier')
await clear(conn, 'job')
await clear(conn, 'job_schedule')
await clear(conn, 'policy')
await clear(conn, 'rate_limit')
await clear(conn, 'response')
def __init__(self) -> None:
self._active = False
self._nursery_manager = trio.open_nursery()
self.cancel_scope = None
async def run(self) -> None:
if self._run_lock.locked():
raise LifecycleError(
"Cannot run a service with the run lock already engaged. Already started?"
)
elif self.is_started:
raise LifecycleError("Cannot run a service which is already started.")
async with self._run_lock:
async with trio.open_nursery() as system_nursery:
try:
async with trio.open_nursery() as task_nursery:
self._task_nursery = task_nursery
system_nursery.start_soon(
self._handle_cancelled,
task_nursery,
)
system_nursery.start_soon(
self._handle_stopped,
system_nursery,
)
task_nursery.start_soon(self._handle_run)
self._started.set()
# ***BLOCKING HERE***