Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
assert client_requester.sample_statistics() == SessionStatistics() # Not incremented!
# Can't send multiframe anonymous messages
with pytest.raises(OperationNotDefinedForAnonymousNodeError):
assert await broadcaster.send_until(Transfer(
timestamp=ts,
priority=Priority.SLOW,
transfer_id=2,
fragmented_payload=[_mem('qwe'), _mem('rty')] * 50 # Lots of data here, very multiframe
), tr.loop.time() + 1.0)
#
# Broadcast exchange with input dispatch test
#
selective_m12345_5 = tr2.get_input_session(InputSessionSpecifier(MessageDataSpecifier(12345), 5), meta)
selective_m12345_9 = tr2.get_input_session(InputSessionSpecifier(MessageDataSpecifier(12345), 9), meta)
promiscuous_m12345 = tr2.get_input_session(InputSessionSpecifier(MessageDataSpecifier(12345), None), meta)
assert await broadcaster.send_until(Transfer(
timestamp=ts,
priority=Priority.IMMEDIATE,
transfer_id=32 + 11, # Modulus 11
fragmented_payload=[_mem('abc'), _mem('def')]
), tr.loop.time() + 1.0)
assert broadcaster.sample_statistics() == SessionStatistics(transfers=2, frames=2, payload_bytes=12)
assert tr.sample_statistics() == can.CANTransportStatistics(out_frames=2)
assert tr2.sample_statistics() == can.CANTransportStatistics(
in_frames=2, in_frames_uavcan=2, in_frames_uavcan_accepted=1)
received = await promiscuous_m12345.receive_until(tr.loop.time() + 1.0)
assert received is not None
assert sum(map(len, payload_x3)) == payload_x3_size_bytes
#
# Instantiate session objects.
#
meta = PayloadMetadata(0x_bad_c0ffee_0dd_f00d, 10000)
broadcaster = tr2.get_output_session(OutputSessionSpecifier(MessageDataSpecifier(12345), None), meta)
assert broadcaster is tr2.get_output_session(OutputSessionSpecifier(MessageDataSpecifier(12345), None), meta)
subscriber_promiscuous = tr.get_input_session(InputSessionSpecifier(MessageDataSpecifier(12345), None), meta)
assert subscriber_promiscuous is tr.get_input_session(InputSessionSpecifier(MessageDataSpecifier(12345), None),
meta)
subscriber_selective = tr.get_input_session(InputSessionSpecifier(MessageDataSpecifier(12345), 123), meta)
assert subscriber_selective is tr.get_input_session(InputSessionSpecifier(MessageDataSpecifier(12345), 123), meta)
server_listener = tr.get_input_session(
InputSessionSpecifier(ServiceDataSpecifier(444, ServiceDataSpecifier.Role.REQUEST), None), meta)
assert server_listener is tr.get_input_session(
InputSessionSpecifier(ServiceDataSpecifier(444, ServiceDataSpecifier.Role.REQUEST), None), meta)
server_responder = tr.get_output_session(
OutputSessionSpecifier(ServiceDataSpecifier(444, ServiceDataSpecifier.Role.RESPONSE), 222), meta)
assert server_responder is tr.get_output_session(
OutputSessionSpecifier(ServiceDataSpecifier(444, ServiceDataSpecifier.Role.RESPONSE), 222), meta)
client_requester = tr2.get_output_session(
OutputSessionSpecifier(ServiceDataSpecifier(444, ServiceDataSpecifier.Role.REQUEST), 111), meta)
assert client_requester is tr2.get_output_session(
OutputSessionSpecifier(ServiceDataSpecifier(444, ServiceDataSpecifier.Role.REQUEST), 111), meta)
# Instantiate session objects
#
meta = PayloadMetadata(0x_bad_c0ffee_0dd_f00d, 10000)
with pytest.raises(Exception): # Can't broadcast service calls
tr.get_output_session(OutputSessionSpecifier(ServiceDataSpecifier(123, ServiceDataSpecifier.Role.RESPONSE),
None),
meta)
with pytest.raises(UnsupportedSessionConfigurationError): # Can't unicast messages
tr.get_output_session(OutputSessionSpecifier(MessageDataSpecifier(1234), 123), meta)
broadcaster = tr.get_output_session(OutputSessionSpecifier(MessageDataSpecifier(12345), None), meta)
assert broadcaster is tr.get_output_session(OutputSessionSpecifier(MessageDataSpecifier(12345), None), meta)
subscriber_promiscuous = tr.get_input_session(InputSessionSpecifier(MessageDataSpecifier(2222), None), meta)
assert subscriber_promiscuous is tr.get_input_session(InputSessionSpecifier(MessageDataSpecifier(2222), None), meta)
subscriber_selective = tr.get_input_session(InputSessionSpecifier(MessageDataSpecifier(2222), 123), meta)
assert subscriber_selective is tr.get_input_session(InputSessionSpecifier(MessageDataSpecifier(2222), 123), meta)
server_listener = tr.get_input_session(
InputSessionSpecifier(ServiceDataSpecifier(333, ServiceDataSpecifier.Role.REQUEST), None), meta)
assert server_listener is tr.get_input_session(
InputSessionSpecifier(ServiceDataSpecifier(333, ServiceDataSpecifier.Role.REQUEST), None), meta)
server_responder = tr.get_output_session(
OutputSessionSpecifier(ServiceDataSpecifier(333, ServiceDataSpecifier.Role.RESPONSE), 123), meta)
assert server_responder is tr.get_output_session(
OutputSessionSpecifier(ServiceDataSpecifier(333, ServiceDataSpecifier.Role.RESPONSE), 123), meta)
client_requester = tr.get_output_session(
async def _unittest_loopback_transport_service() -> None:
from pyuavcan.transport import ServiceDataSpecifier, InputSessionSpecifier, OutputSessionSpecifier
payload_metadata = pyuavcan.transport.PayloadMetadata(0xdeadbeef0ddf00d, 1234)
tr = pyuavcan.transport.loopback.LoopbackTransport(1234)
inp = tr.get_input_session(InputSessionSpecifier(ServiceDataSpecifier(123, ServiceDataSpecifier.Role.REQUEST),
1234),
payload_metadata)
out = tr.get_output_session(OutputSessionSpecifier(ServiceDataSpecifier(123, ServiceDataSpecifier.Role.REQUEST),
1234),
payload_metadata)
assert await out.send_until(pyuavcan.transport.Transfer(
timestamp=pyuavcan.transport.Timestamp.now(),
priority=pyuavcan.transport.Priority.IMMEDIATE,
transfer_id=123, # mod 32 = 27
fragmented_payload=[memoryview(b'Hello world!')],
), tr.loop.time() + 1.0)
assert None is not await inp.receive_until(0)
By default, the size of the input queue is unlimited; the user may provide a positive integer value to override
this. If the user is not reading the received messages quickly enough and the size of the queue is limited
(technically, it is always limited at least by the amount of the available memory),
the queue may become full in which case newer messages will be dropped and the overrun counter
will be incremented once per dropped message.
See :class:`Subscriber` for further information about subscribers.
"""
if issubclass(dtype, pyuavcan.dsdl.ServiceObject):
raise TypeError(f'Not a message type: {dtype}')
self._raise_if_closed()
data_specifier = pyuavcan.transport.MessageDataSpecifier(subject_id)
session_specifier = pyuavcan.transport.InputSessionSpecifier(data_specifier, None)
try:
impl = self._registry[Subscriber, session_specifier]
assert isinstance(impl, SubscriberImpl)
except LookupError:
transport_session = self._transport.get_input_session(session_specifier, self._make_payload_metadata(dtype))
impl = SubscriberImpl(dtype=dtype,
transport_session=transport_session,
finalizer=self._make_finalizer(Subscriber, session_specifier),
loop=self.loop)
self._registry[Subscriber, session_specifier] = impl
assert isinstance(impl, SubscriberImpl)
return Subscriber(impl=impl,
loop=self.loop,
queue_capacity=queue_capacity)
See :class:`Server` for further information about servers.
"""
if not issubclass(dtype, pyuavcan.dsdl.ServiceObject):
raise TypeError(f'Not a service type: {dtype}')
self._raise_if_closed()
def output_transport_session_factory(client_node_id: int) -> pyuavcan.transport.OutputSession:
_logger.info('%r has requested a new output session to client node %s', impl, client_node_id)
ds = pyuavcan.transport.ServiceDataSpecifier(service_id,
pyuavcan.transport.ServiceDataSpecifier.Role.RESPONSE)
return self._transport.get_output_session(pyuavcan.transport.OutputSessionSpecifier(ds, client_node_id),
self._make_payload_metadata(dtype.Response))
input_session_specifier = pyuavcan.transport.InputSessionSpecifier(
pyuavcan.transport.ServiceDataSpecifier(service_id, pyuavcan.transport.ServiceDataSpecifier.Role.REQUEST),
None
)
try:
impl = self._registry[Server, input_session_specifier]
assert isinstance(impl, Server)
except LookupError:
input_transport_session = self._transport.get_input_session(input_session_specifier,
self._make_payload_metadata(dtype.Request))
impl = Server(dtype=dtype,
input_transport_session=input_transport_session,
output_transport_session_factory=output_transport_session_factory,
finalizer=self._make_finalizer(Server, input_session_specifier),
loop=self.loop)
self._registry[Server, input_session_specifier] = impl
def _unittest_redundant_input_cyclic() -> None:
import time
import pytest
from pyuavcan.transport import Transfer, Timestamp, Priority, ResourceClosedError
from pyuavcan.transport.loopback import LoopbackTransport
loop = asyncio.get_event_loop()
await_ = loop.run_until_complete
spec = pyuavcan.transport.InputSessionSpecifier(pyuavcan.transport.MessageDataSpecifier(4321), None)
spec_tx = pyuavcan.transport.OutputSessionSpecifier(spec.data_specifier, None)
meta = pyuavcan.transport.PayloadMetadata(0x_deadbeef_deadbeef, 30)
ts = Timestamp.now()
tr_a = LoopbackTransport(111)
tr_b = LoopbackTransport(111)
tx_a = tr_a.get_output_session(spec_tx, meta)
tx_b = tr_b.get_output_session(spec_tx, meta)
inf_a = tr_a.get_input_session(spec, meta)
inf_b = tr_b.get_input_session(spec, meta)
inf_a.transfer_id_timeout = 1.1 # This is used to ensure that the transfer-ID timeout is handled correctly.
is_retired = False
def _handle_received_frame(self, frame: SerialFrame) -> None:
self._statistics.in_frames += 1
if frame.destination_node_id in (self._local_node_id, None):
for source_node_id in {None, frame.source_node_id}:
ss = pyuavcan.transport.InputSessionSpecifier(frame.data_specifier, source_node_id)
try:
session = self._input_registry[ss]
except LookupError:
pass
else:
# noinspection PyProtectedMember
session._process_frame(frame)