Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
transfer_id_modulo=32,
max_nodes=2 ** 64,
mtu=2 ** 64 - 1,
)
tr.protocol_parameters = protocol_params
assert tr.protocol_parameters == protocol_params
assert tr.loop is asyncio.get_event_loop()
assert tr.local_node_id is None
tr = pyuavcan.transport.loopback.LoopbackTransport(42)
tr.protocol_parameters = protocol_params
assert 42 == tr.local_node_id
payload_metadata = pyuavcan.transport.PayloadMetadata(0xdeadbeef0ddf00d, 1234)
message_spec_123_in = pyuavcan.transport.InputSessionSpecifier(pyuavcan.transport.MessageDataSpecifier(123), 123)
message_spec_123_out = pyuavcan.transport.OutputSessionSpecifier(pyuavcan.transport.MessageDataSpecifier(123), 123)
message_spec_42_in = pyuavcan.transport.InputSessionSpecifier(pyuavcan.transport.MessageDataSpecifier(123), 42)
message_spec_any_out = pyuavcan.transport.OutputSessionSpecifier(pyuavcan.transport.MessageDataSpecifier(123), None)
out_123 = tr.get_output_session(specifier=message_spec_123_out, payload_metadata=payload_metadata)
assert out_123 is tr.get_output_session(specifier=message_spec_123_out, payload_metadata=payload_metadata)
last_feedback: typing.Optional[pyuavcan.transport.Feedback] = None
def on_feedback(fb: pyuavcan.transport.Feedback) -> None:
nonlocal last_feedback
last_feedback = fb
out_123.enable_feedback(on_feedback)
ts = pyuavcan.transport.Timestamp.now()
def _unittest_transport_primitives() -> None:
from pytest import raises
from pyuavcan.transport import InputSessionSpecifier, OutputSessionSpecifier
from pyuavcan.transport import MessageDataSpecifier, ServiceDataSpecifier, PayloadMetadata
with raises(ValueError):
MessageDataSpecifier(-1)
with raises(ValueError):
MessageDataSpecifier(32768)
with raises(ValueError):
ServiceDataSpecifier(-1, ServiceDataSpecifier.Role.REQUEST)
with raises(ValueError):
InputSessionSpecifier(MessageDataSpecifier(123), -1)
with raises(ValueError):
OutputSessionSpecifier(ServiceDataSpecifier(100, ServiceDataSpecifier.Role.RESPONSE), None)
with raises(ValueError):
PayloadMetadata(-1, 0)
rx_transfer = await server_listener.receive_until(get_monotonic() + 5.0)
print('SERVER LISTENER TRANSFER:', rx_transfer)
assert isinstance(rx_transfer, TransferFrom)
assert rx_transfer.priority == Priority.HIGH
assert rx_transfer.transfer_id == 88888
assert len(rx_transfer.fragmented_payload) == 3
assert b''.join(rx_transfer.fragmented_payload) == b''.join(payload_x3)
assert None is await subscriber_selective.receive_until(get_monotonic() + 0.1)
assert None is await subscriber_promiscuous.receive_until(get_monotonic() + 0.1)
assert None is await server_listener.receive_until(get_monotonic() + 0.1)
assert None is await client_listener.receive_until(get_monotonic() + 0.1)
print('tr :', tr.sample_statistics())
assert tr.sample_statistics().demultiplexer[
MessageDataSpecifier(12345)
].accepted_datagrams == {222: 1}
assert tr.sample_statistics().demultiplexer[
ServiceDataSpecifier(444, ServiceDataSpecifier.Role.REQUEST)
].accepted_datagrams == {222: 3 * 2} # Deterministic data loss mitigation is enabled, multiplication factor 2
print('tr2:', tr2.sample_statistics())
assert tr2.sample_statistics().demultiplexer[
ServiceDataSpecifier(444, ServiceDataSpecifier.Role.RESPONSE)
].accepted_datagrams == {}
#
# Termination.
#
assert set(tr.input_sessions) == {subscriber_promiscuous, subscriber_selective, server_listener}
assert set(tr.output_sessions) == {server_responder}
assert set(tr2.input_sessions) == {client_listener}
assert set(tr2.output_sessions) == {broadcaster, client_requester}
identifier=MessageCANID(Priority.IMMEDIATE, 5, 12345).compile([_mem('abcdef')]), # payload fragments joined
padded_payload=_mem('abcdef'),
transfer_id=11,
start_of_transfer=True,
end_of_transfer=True,
toggle_bit=True,
loopback=False
).compile())
assert collector.empty
#
# Broadcast exchange with input dispatch test
#
selective_m12345_5 = tr2.get_input_session(InputSessionSpecifier(MessageDataSpecifier(12345), 5), meta)
selective_m12345_9 = tr2.get_input_session(InputSessionSpecifier(MessageDataSpecifier(12345), 9), meta)
promiscuous_m12345 = tr2.get_input_session(InputSessionSpecifier(MessageDataSpecifier(12345), None), meta)
assert await broadcaster.send_until(Transfer(
timestamp=ts,
priority=Priority.IMMEDIATE,
transfer_id=32 + 11, # Modulus 11
fragmented_payload=[_mem('abc'), _mem('def')]
), tr.loop.time() + 1.0)
assert broadcaster.sample_statistics() == SessionStatistics(transfers=2, frames=2, payload_bytes=12)
assert tr.sample_statistics() == can.CANTransportStatistics(out_frames=2)
assert tr2.sample_statistics() == can.CANTransportStatistics(
in_frames=2, in_frames_uavcan=2, in_frames_uavcan_accepted=1)
received = await promiscuous_m12345.receive_until(tr.loop.time() + 1.0)
assert received is not None
assert isinstance(received, TransferFrom)
def _unittest_transport_primitives() -> None:
from pytest import raises
from pyuavcan.transport import InputSessionSpecifier, OutputSessionSpecifier
from pyuavcan.transport import MessageDataSpecifier, ServiceDataSpecifier, PayloadMetadata
with raises(ValueError):
MessageDataSpecifier(-1)
with raises(ValueError):
MessageDataSpecifier(32768)
with raises(ValueError):
ServiceDataSpecifier(-1, ServiceDataSpecifier.Role.REQUEST)
with raises(ValueError):
InputSessionSpecifier(MessageDataSpecifier(123), -1)
with raises(ValueError):
OutputSessionSpecifier(ServiceDataSpecifier(100, ServiceDataSpecifier.Role.RESPONSE), None)
with raises(ValueError):
PayloadMetadata(-1, 0)
with raises(ValueError):
PayloadMetadata(2 ** 64, 0)
This logic follows the RAII pattern.
By default, the size of the input queue is unlimited; the user may provide a positive integer value to override
this. If the user is not reading the received messages quickly enough and the size of the queue is limited
(technically, it is always limited at least by the amount of the available memory),
the queue may become full in which case newer messages will be dropped and the overrun counter
will be incremented once per dropped message.
See :class:`Subscriber` for further information about subscribers.
"""
if issubclass(dtype, pyuavcan.dsdl.ServiceObject):
raise TypeError(f'Not a message type: {dtype}')
self._raise_if_closed()
data_specifier = pyuavcan.transport.MessageDataSpecifier(subject_id)
session_specifier = pyuavcan.transport.InputSessionSpecifier(data_specifier, None)
try:
impl = self._registry[Subscriber, session_specifier]
assert isinstance(impl, SubscriberImpl)
except LookupError:
transport_session = self._transport.get_input_session(session_specifier, self._make_payload_metadata(dtype))
impl = SubscriberImpl(dtype=dtype,
transport_session=transport_session,
finalizer=self._make_finalizer(Subscriber, session_specifier),
loop=self.loop)
self._registry[Subscriber, session_specifier] = impl
assert isinstance(impl, SubscriberImpl)
return Subscriber(impl=impl,
loop=self.loop,
queue_capacity=queue_capacity)
"""
Creates a new publisher instance for the specified subject-ID. All publishers created for a specific
subject share the same underlying implementation object which is hidden from the user;
the implementation is reference counted and it is destroyed automatically along with its
underlying transport level session instance when the last publisher is closed.
The publisher instance will be closed automatically from the finalizer when garbage collected
if the user did not bother to do that manually. This logic follows the RAII pattern.
See :class:`Publisher` for further information about publishers.
"""
if issubclass(dtype, pyuavcan.dsdl.ServiceObject):
raise TypeError(f'Not a message type: {dtype}')
self._raise_if_closed()
data_specifier = pyuavcan.transport.MessageDataSpecifier(subject_id)
session_specifier = pyuavcan.transport.OutputSessionSpecifier(data_specifier, None)
try:
impl = self._registry[Publisher, session_specifier]
assert isinstance(impl, PublisherImpl)
except LookupError:
transport_session = self._transport.get_output_session(session_specifier,
self._make_payload_metadata(dtype))
transfer_id_counter = self._output_transfer_id_map.setdefault(session_specifier,
OutgoingTransferIDCounter())
impl = PublisherImpl(dtype=dtype,
transport_session=transport_session,
transfer_id_counter=transfer_id_counter,
finalizer=self._make_finalizer(Publisher, session_specifier),
loop=self.loop)
self._registry[Publisher, session_specifier] = impl
def __init__(self,
specifier: pyuavcan.transport.SessionSpecifier,
payload_metadata: pyuavcan.transport.PayloadMetadata,
transport: pyuavcan.transport.can.CANTransport,
send_handler: SendHandler,
finalizer: _base.SessionFinalizer):
"""Use the factory method."""
assert specifier.remote_node_id is None, 'Internal protocol violation: expected broadcast'
if not isinstance(specifier.data_specifier, pyuavcan.transport.MessageDataSpecifier):
raise pyuavcan.transport.UnsupportedSessionConfigurationError(
f'This transport does not support broadcast outputs for {specifier.data_specifier}')
self._subject_id = specifier.data_specifier.subject_id
super(BroadcastCANOutputSession, self).__init__(transport=transport,
send_handler=send_handler,
specifier=specifier,
payload_metadata=payload_metadata,
finalizer=finalizer)
def _unittest_session() -> None:
closed = False
specifier = pyuavcan.transport.OutputSessionSpecifier(pyuavcan.transport.MessageDataSpecifier(123), 123)
payload_metadata = pyuavcan.transport.PayloadMetadata(0xdeadbeef0ddf00d, 1234)
def do_close() -> None:
nonlocal closed
closed = True
async def do_route(_a: pyuavcan.transport.Transfer, _b: float) -> bool:
raise NotImplementedError
ses = LoopbackOutputSession(specifier=specifier,
payload_metadata=payload_metadata,
loop=asyncio.get_event_loop(),
closer=do_close,
router=do_route)
assert specifier == ses.specifier
destination_endpoint = '127.100.0.1', 25406
sock_rx = socket_.socket(socket_.AF_INET, socket_.SOCK_DGRAM)
sock_rx.bind(destination_endpoint)
sock_rx.settimeout(1.0)
def make_sock() -> socket_.socket:
sock = socket_.socket(socket_.AF_INET, socket_.SOCK_DGRAM)
sock.bind(('127.100.0.2', 0))
sock.connect(destination_endpoint)
sock.setblocking(False)
return sock
sos = UDPOutputSession(
specifier=OutputSessionSpecifier(MessageDataSpecifier(3210), None),
payload_metadata=PayloadMetadata(0xdead_beef_badc0ffe, 1024),
mtu=11,
multiplier=1,
sock=make_sock(),
loop=asyncio.get_event_loop(),
finalizer=do_finalize,
)
assert sos.specifier == OutputSessionSpecifier(MessageDataSpecifier(3210), None)
assert sos.destination_node_id is None
assert sos.payload_metadata == PayloadMetadata(0xdead_beef_badc0ffe, 1024)
assert sos.sample_statistics() == SessionStatistics()
assert run_until_complete(sos.send_until(
Transfer(timestamp=ts,
priority=Priority.NOMINAL,