Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def _server(
self,
amqp,
server_future,
exchange_name,
routing_key,
task_status=trio.TASK_STATUS_IGNORED
):
"""Consume messages and reply to them by publishing messages back
to the client using routing key set to the reply_to property
"""
async with amqp.new_channel() as channel:
await channel.queue_declare(server_queue_name, exclusive=False, no_wait=False)
await channel.exchange_declare(exchange_name, type_name='direct')
await channel.queue_bind(server_queue_name, exchange_name, routing_key=routing_key)
async with trio.open_nursery() as n:
await n.start(self._server_consumer, channel, server_future)
task_status.started()
await server_future.wait()
self._server_scope.cancel()
async def spawn_and_sleep_forever(task_status=trio.TASK_STATUS_IGNORED):
async with tractor.open_nursery() as tn:
for i in range(3):
await tn.run_in_actor('sucka', sleep_forever)
task_status.started()
await trio.sleep_forever()
async def handle_lifespan(
self, *, task_status: trio._core._run._TaskStatus = trio.TASK_STATUS_IGNORED
) -> None:
task_status.started()
scope = {"type": "lifespan", "asgi": {"spec_version": "2.0"}}
try:
await invoke_asgi(self.app, scope, self.asgi_receive, self.asgi_send)
except LifespanFailure:
# Lifespan failures should crash the server
raise
except Exception:
self.supported = False
await self.config.log.exception(
"ASGI Framework Lifespan error, continuing without Lifespan support"
)
finally:
await self.app_send_channel.aclose()
await self.app_receive_channel.aclose()
async def cancel_on_completion(
portal: Portal,
actor: Actor,
errors: Dict[Tuple[str, str], Exception],
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> None:
"""Cancel actor gracefully once it's "main" portal's
result arrives.
Should only be called for actors spawned with `run_in_actor()`.
"""
with trio.CancelScope() as cs:
task_status.started(cs)
# if this call errors we store the exception for later
# in ``errors`` which will be reraised inside
# a MultiError and we still send out a cancel request
result = await exhaust_portal(portal, actor)
if isinstance(result, Exception):
errors[actor.uid] = result
log.warning(
f"Cancelling {portal.channel.uid} after error {result}"
async def run(self, do_init, do_push, do_pull, task_status=trio.TASK_STATUS_IGNORED):
assert self.nursery is None
with self.cancel_scope:
try:
self.vault.check_existence()
self.vault.identity.read()
self.vault.identity.assert_initialized()
except IdentityNotInitialized:
self.logger.info("Identity not yet initialized.")
await self.app.set_vault_state(self.vault, VaultState.UNINITIALIZED)
except SyncryptBaseException:
self.logger.exception("Failure during vault initialization")
await self.app.set_vault_state(self.vault, VaultState.FAILURE)
self.logger.debug("Finished vault initialization successfully.")
table,
bind="0.0.0.0",
port=512,
tls_certificate=None,
token=None,
max_line_size=None,
recv_size=None,
cleanup_timeout=None,
qsize=10000,
batch_size=None,
batch_timeout=None,
retry_max_attempts=None,
retry_max_wait=None,
retry_multiplier=None,
api_timeout=None,
task_status=trio.TASK_STATUS_IGNORED,
):
# We want to make sure that the token we were given is a bytes object, and if it
# isn't then we'll encode it using utf8.
if isinstance(token, str):
token = token.encode("utf8")
# Total number of buffered events is:
# qsize + (COUNT(send_batch) * batch_size)
# However, the length of time a single send_batch call sticks around for is time
# boxed, so this won't grow forever. It will not however, apply any backpressure
# to the sender (we can't meaningfully apply backpressure, since these are download
# events being streamed to us).
q = trio.Queue(qsize)
async with trio.open_nursery() as nursery:
nursery.start_soon(
async def _writer_loop(self, task_status=trio.TASK_STATUS_IGNORED):
with trio.open_cancel_scope(shield=True) as scope:
self._writer_scope = scope
task_status.started()
while self.state != CLOSED:
if self.server_heartbeat:
timeout = self.server_heartbeat / 2
else:
timeout = inf
with trio.move_on_after(timeout) as timeout_scope:
frame, encoder = await self._send_receive_channel.receive()
if timeout_scope.cancelled_caught:
await self.send_heartbeat()
continue
f = frame.get_frame(encoder)
async def _call_later(
timeout: float,
callback: Callable,
task_status: trio._core._run._TaskStatus = trio.TASK_STATUS_IGNORED,
) -> None:
cancel_scope = trio.CancelScope()
task_status.started(cancel_scope)
with cancel_scope:
await trio.sleep(timeout)
cancel_scope.shield = True
await callback()
async def start(self, task_status=trio.TASK_STATUS_IGNORED):
async with trio_asyncio.open_loop():
self.initialize()
assert self.web_app is not None
runner = web.AppRunner(self.web_app)
await trio_asyncio.aio_as_trio(runner.setup)
site = web.TCPSite(runner,
self.app.config.api['host'],
self.app.config.api['port']
)
await trio_asyncio.aio_as_trio(site.start)
logger.info("REST API Server started at http://{0.api[host]}:{0.api[port]}"\
.format(self.app.config))