Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_reg():
async with unit(1) as unit1:
async with unit(2) as unit2:
rx = 0
async for d in unit2.poll("qbroker.ping", max_delay=TIMEOUT, result_conv=CC_DATA):
if d['uuid'] == unit1.uuid:
assert d['app'] == unit1.app
rx += 1
elif d['uuid'] == unit2.uuid:
assert d['app'] == unit2.app
rx += 1
# There may be others.
assert rx == 2
with pytest.raises(trio.TooSlowError):
async for d in unit2.poll("qbroker.ping", min_replies=99, max_delay=TIMEOUT / 2,
result_conv=CC_DATA):
pass
res = await unit2.rpc("qbroker.ping", dest=unit1.app)
assert res['app'] == unit1.app
assert "rpc.qbroker.ping" in res['endpoints'], res['endpoints']
async def send_write(self, client, max_set_size):
readset = self.readset(0, max_set_size)
writeset = self.writeset(max_set_size)
read_version = self.read_block_id()
msg = self.skvbc.write_req(readset, writeset, read_version)
seq_num = client.req_seq_num.next()
client_id = client.client_id
self.tracker.send_write(
client_id, seq_num, readset, dict(writeset), read_version)
try:
serialized_reply = await client.write(msg, seq_num)
self.status.record_client_reply(client_id)
reply = self.skvbc.parse_reply(serialized_reply)
self.tracker.handle_write_reply(client_id, seq_num, reply)
except trio.TooSlowError:
self.status.record_client_timeout(client_id)
return
async def safe_exchange_delete(self, exchange_name, channel=None):
"""Delete the exchange but does not raise any exception if it fails
The operation has a timeout as well.
"""
channel = channel or self.channel
full_exchange_name = self.full_name(exchange_name)
try:
await channel.exchange_delete(full_exchange_name, no_wait=False)
except trio.TooSlowError:
logger.warning('Timeout on exchange %s deletion', full_exchange_name, exc_info=True)
except Exception: # pylint: disable=broad-except
logger.error(
'Unexpected error on exchange %s deletion', full_exchange_name, exc_info=True
)
async def _testTimeout(self, msg, read_only):
config = self.config._replace(req_timeout_milli=100)
with bft_client.UdpClient(config, self.replicas) as udp_client:
with self.assertRaises(trio.TooSlowError):
await udp_client.sendSync(msg, read_only)
async def check_handshake_timeout(self, handshake_successful_event: trio.Event) -> None:
try:
with trio.fail_after(HANDSHAKE_TIMEOUT):
# Only the timeout for successful handshakes has to be checked as a failure during
# handshake will make the service as a whole fail.
await handshake_successful_event.wait()
except trio.TooSlowError as too_slow_error:
self.logger.warning("Handshake with %s has timed out", encode_hex(self.remote_node_id))
raise HandshakeFailure("Handshake has timed out") from too_slow_error
async def read_data(self) -> None:
while True:
try:
with trio.fail_after(self.config.keep_alive_timeout):
data = await self.stream.receive_some(MAX_RECV)
except trio.TooSlowError:
if len(self.streams) == 0:
raise
else:
continue # Keep waiting
if data == b"":
return
await self.process_data(data)
# This also makes it easier to deal with the error handling aspects of sending a
# batch, from the work of actually sending. The general rule here is that errors
# shoudl not escape from this function.
send = actually_send_batch.retry_with(
wait=tenacity.wait_exponential(multiplier=retry_multiplier, max=retry_max_wait),
stop=tenacity.stop_after_attempt(retry_max_attempts),
)
try:
await send(bq, table, template_suffix, batch, *args, **kwargs)
# We've tried to send this batch to BigQuery, however for one reason or another
# we were unable to do so. We should log this error, but otherwise we're going
# to just drop this on the floor because there's not much else we can do here
# except buffer it forever (which is not a great idea).
except trio.TooSlowError:
logger.error("Timed out sending %d items; Dropping them.", len(batch))
except Exception:
logger.exception("Error sending %d items; Dropping them.", len(batch))
def timeout_error():
asynclib = anyio.sniffio.current_async_library()
if asynclib == 'asyncio':
import asyncio
return asyncio.TimeoutError()
if asynclib == 'trio':
import trio
return trio.TooSlowError()
if asynclib == 'curio':
import curio
return curio.TimeoutError()
raise RuntimeError("Asynclib detection failed")
async def run(self) -> None:
try:
try:
with trio.fail_after(self.config.ssl_handshake_timeout):
await self.stream.do_handshake()
except (trio.BrokenResourceError, trio.TooSlowError):
return # Handshake failed
alpn_protocol = self.stream.selected_alpn_protocol()
socket = self.stream.transport_stream.socket
ssl = True
except AttributeError: # Not SSL
alpn_protocol = "http/1.1"
socket = self.stream.socket
ssl = False
try:
client = parse_socket_addr(socket.family, socket.getpeername())
server = parse_socket_addr(socket.family, socket.getsockname())
async with trio.open_nursery() as nursery:
self.nursery = nursery
self.protocol = ProtocolWrapper(
self.broadcaster.our_address = safe_getsockname(self.udp_sock)
command = self.broadcaster.register('127.0.0.1')
await self.send(ca.EPICS_CA2_PORT, command)
task_status.started()
while True:
async with self._cleanup_condition:
if self._cleanup_event.is_set():
self.udp_sock.close()
self.log.debug('Exiting broadcaster recv loop')
break
try:
with trio.fail_after(0.5):
bytes_received, address = await self.udp_sock.recvfrom(4096)
except trio.TooSlowError:
continue
if bytes_received:
if bytes_received is ca.DISCONNECTED:
break
commands = self.broadcaster.recv(bytes_received, address)
await self.command_chan.send.send(commands)