Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
assert stat.messages == 1
await pub_heart.publish(heart)
rx = (await sub_heart.receive())[0]
assert repr(rx) == repr(heart)
await pub_heart.publish(heart)
rx = (await sub_heart.receive_until(asyncio.get_event_loop().time() + _RX_TIMEOUT))[0] # type: ignore
assert repr(rx) == repr(heart)
rx = await sub_heart.receive_for(_RX_TIMEOUT)
assert rx is None
sub_heart.close()
sub_heart.close() # Shall not raise.
record_handler_output: typing.List[typing.Tuple[uavcan.diagnostic.Record_1_0, pyuavcan.transport.TransferFrom]] = []
async def record_handler(message: uavcan.diagnostic.Record_1_0,
cb_transfer: pyuavcan.transport.TransferFrom) -> None:
print('RECORD HANDLER:', message, cb_transfer)
record_handler_output.append((message, cb_transfer))
sub_record2.receive_in_background(record_handler)
record = uavcan.diagnostic.Record_1_0(timestamp=uavcan.time.SynchronizedTimestamp_1_0(1234567890),
severity=uavcan.diagnostic.Severity_1_0(uavcan.diagnostic.Severity_1_0.ALERT),
text='Hello world!')
assert pub_record.priority == pyuavcan.presentation.DEFAULT_PRIORITY
pub_record.priority = Priority.NOMINAL
assert pub_record.priority == Priority.NOMINAL
with pytest.raises(TypeError, match='.*Heartbeat.*'):
# noinspection PyTypeChecker
assert tr.sample_statistics() == SerialTransportStatistics()
#
# Message exchange test.
#
assert await broadcaster.send_until(
Transfer(timestamp=Timestamp.now(),
priority=Priority.LOW,
transfer_id=77777,
fragmented_payload=payload_single),
monotonic_deadline=get_monotonic() + 5.0
)
rx_transfer = await subscriber_promiscuous.receive_until(get_monotonic() + 5.0)
print('PROMISCUOUS SUBSCRIBER TRANSFER:', rx_transfer)
assert isinstance(rx_transfer, TransferFrom)
assert rx_transfer.priority == Priority.LOW
assert rx_transfer.transfer_id == 77777
assert rx_transfer.fragmented_payload == [b''.join(payload_single)]
print(tr.sample_statistics())
assert tr.sample_statistics().in_bytes >= 32 + sft_capacity + 2
assert tr.sample_statistics().in_frames == 1
assert tr.sample_statistics().in_out_of_band_bytes == 0
assert tr.sample_statistics().out_bytes == tr.sample_statistics().in_bytes
assert tr.sample_statistics().out_frames == 1
assert tr.sample_statistics().out_transfers == 1
assert tr.sample_statistics().out_incomplete == 0
with pytest.raises(pyuavcan.transport.OperationNotDefinedForAnonymousNodeError):
# Anonymous nodes can't send multiframe transfers.
assert await broadcaster.send_until(
async def receive_until(self, monotonic_deadline: float) -> typing.Optional[pyuavcan.transport.TransferFrom]:
try:
timeout = monotonic_deadline - self._loop.time()
if timeout > 0:
transfer = await asyncio.wait_for(self._queue.get(), timeout, loop=self._loop)
else:
transfer = self._queue.get_nowait()
except (asyncio.TimeoutError, asyncio.QueueEmpty):
# If there are unprocessed transfers, allow the caller to read them even if the instance is closed.
if self._maybe_finalizer is None:
raise pyuavcan.transport.ResourceClosedError(f'{self} is closed')
return None
else:
assert isinstance(transfer, pyuavcan.transport.TransferFrom), 'Internal protocol violation'
assert transfer.source_node_id == self._specifier.remote_node_id or self._specifier.remote_node_id is None
return transfer
pending = {self._loop.create_task(do_receive(if_idx, inf)) for if_idx, inf in enumerate(inferiors)}
try:
while True:
assert len(pending) == len(inferiors)
done, pending = await asyncio.wait(pending, # type: ignore
loop=self._loop,
return_when=asyncio.FIRST_COMPLETED)
_logger.debug('%r wait result: %d pending, %d done: %r', self, len(pending), len(done), done)
# Process those that are done and push received transfers into the backlog.
transfer_id_timeout = self.transfer_id_timeout # May have been updated.
for f in done:
if_idx, inf, tr = await f
assert isinstance(if_idx, int) and isinstance(inf, pyuavcan.transport.InputSession)
if tr is not None: # Otherwise, the read has timed out.
assert isinstance(tr, pyuavcan.transport.TransferFrom)
if self._deduplicator.should_accept_transfer(if_idx, transfer_id_timeout, tr):
self._backlog.append(self._make_transfer(tr, inf))
# Termination condition: success or timeout. We may have read more than one transfer.
if self._backlog or self._loop.time() >= monotonic_deadline:
break
# Not done yet - restart those reads that have completed; the pending ones remain pending, unchanged.
for f in done:
if_idx, inf, _ = f.result()
assert isinstance(if_idx, int) and isinstance(inf, pyuavcan.transport.InputSession)
pending.add(self._loop.create_task(do_receive(if_idx, inf)))
finally:
if pending:
_logger.debug('%r canceling %d pending reads', self, len(pending))
for f in pending:
transfers=2,
frames=2,
payload_bytes=14,
),
]
assert len(feedback) == 1
assert feedback[0].inferior_session is inf_b
assert feedback[0].original_transfer_timestamp == ts
assert ts.system <= feedback[0].first_frame_transmission_timestamp.system <= time.time()
assert ts.monotonic <= feedback[0].first_frame_transmission_timestamp.monotonic <= time.monotonic()
assert isinstance(feedback[0].inferior_feedback, LoopbackFeedback)
feedback.pop()
assert not feedback
assert None is await_(rx_a.receive_until(loop.time() + 1))
tf_rx = await_(rx_b.receive_until(loop.time() + 1))
assert isinstance(tf_rx, TransferFrom)
assert tf_rx.transfer_id == 88888888888888
assert tf_rx.fragmented_payload == [memoryview(b'hedgehog')]
# Disable the feedback.
ses.disable_feedback()
# A diversion - enable the feedback in the inferior and make sure it's not propagated.
# noinspection PyProtectedMember
ses._enable_feedback_on_inferior(inf_b)
assert await_(ses.send_until(
Transfer(timestamp=ts,
priority=Priority.OPTIONAL,
transfer_id=666666666666666,
fragmented_payload=[memoryview(b'horse')]),
loop.time() + 1.0
))
assert ses.sample_statistics().transfers == 5
def trn(monotonic_ns: int,
transfer_id: int,
fragmented_payload: typing.Sequence[typing.Union[bytes, str, memoryview]]) \
-> pyuavcan.transport.TransferFrom:
return pyuavcan.transport.TransferFrom(
timestamp=pyuavcan.transport.Timestamp(system_ns=0, monotonic_ns=monotonic_ns),
priority=priority,
transfer_id=transfer_id,
fragmented_payload=[
memoryview(x if isinstance(x, (bytes, memoryview)) else x.encode()) for x in fragmented_payload
],
source_node_id=source_node_id)
"""
Do not call this directly.
Instead, use the factory method :meth:`pyuavcan.transport.serial.SerialTransport.get_input_session`.
"""
self._specifier = specifier
self._payload_metadata = payload_metadata
self._loop = loop
assert self._loop is not None
if not isinstance(self._specifier, pyuavcan.transport.InputSessionSpecifier) or \
not isinstance(self._payload_metadata, pyuavcan.transport.PayloadMetadata): # pragma: no cover
raise TypeError('Invalid parameters')
self._statistics = SerialInputSessionStatistics()
self._transfer_id_timeout = self.DEFAULT_TRANSFER_ID_TIMEOUT
self._queue: asyncio.Queue[pyuavcan.transport.TransferFrom] = asyncio.Queue()
self._reassemblers: typing.Dict[int, TransferReassembler] = {}
super(SerialInputSession, self).__init__(finalizer)
import pyuavcan.util
import pyuavcan.dsdl
import pyuavcan.transport
from ._base import MessagePort, MessageClass, TypedSessionFinalizer, Closable
from ._error import PortClosedError
# Shouldn't be too large as this value defines how quickly the task will detect that the underlying transport is closed.
_RECEIVE_TIMEOUT = 1
_logger = logging.getLogger(__name__)
#: Type of the async received message handler callable.
ReceivedMessageHandler = typing.Callable[[MessageClass, pyuavcan.transport.TransferFrom], typing.Awaitable[None]]
@dataclasses.dataclass
class SubscriberStatistics:
transport_session: pyuavcan.transport.SessionStatistics #: Shared per session specifier.
messages: int #: Number of received messages, individual per subscriber.
overruns: int #: Number of messages lost to queue overruns; individual per subscriber.
deserialization_failures: int #: Number of messages lost to deserialization errors; shared per session specifier.
class Subscriber(MessagePort[MessageClass]):
"""
A task should request its own independent subscriber instance from the presentation layer controller.
Do not share the same subscriber instance across different tasks. This class implements the RAII pattern.
Whenever a message is received from a subject, it is deserialized once and the resulting object is
except (asyncio.TimeoutError, asyncio.QueueEmpty):
# If there are unprocessed messages, allow the caller to read them even if the instance is closed.
self._raise_if_closed()
return None
self._statistics.frames += 1
if isinstance(canid, _identifier.MessageCANID):
assert isinstance(self._specifier.data_specifier, pyuavcan.transport.MessageDataSpecifier)
assert self._specifier.data_specifier.subject_id == canid.subject_id
source_node_id = canid.source_node_id
if source_node_id is None:
# Anonymous transfer - no reconstruction needed
self._statistics.transfers += 1
self._statistics.payload_bytes += len(frame.padded_payload)
out = pyuavcan.transport.TransferFrom(timestamp=frame.timestamp,
priority=canid.priority,
transfer_id=frame.transfer_id,
fragmented_payload=[frame.padded_payload],
source_node_id=None)
_logger.debug('%s: Received anonymous transfer: %s; current stats: %s', self, out, self._statistics)
return out
elif isinstance(canid, _identifier.ServiceCANID):
assert isinstance(self._specifier.data_specifier, pyuavcan.transport.ServiceDataSpecifier)
assert self._specifier.data_specifier.service_id == canid.service_id
assert (self._specifier.data_specifier.role == pyuavcan.transport.ServiceDataSpecifier.Role.REQUEST) \
== canid.request_not_response
source_node_id = canid.source_node_id
else:
assert False
def __init__(self,
specifier: pyuavcan.transport.InputSessionSpecifier,
payload_metadata: pyuavcan.transport.PayloadMetadata,
loop: asyncio.AbstractEventLoop,
closer: typing.Callable[[], None]):
self._specifier = specifier
self._payload_metadata = payload_metadata
self._loop = loop
self._closer = closer
self._transfer_id_timeout = float(self.DEFAULT_TRANSFER_ID_TIMEOUT)
self._stats = pyuavcan.transport.SessionStatistics()
self._queue: asyncio.Queue[pyuavcan.transport.TransferFrom] = asyncio.Queue(loop=loop)
super(LoopbackInputSession, self).__init__()