Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _unittest_transport_primitives() -> None:
from pytest import raises
from pyuavcan.transport import InputSessionSpecifier, OutputSessionSpecifier
from pyuavcan.transport import MessageDataSpecifier, ServiceDataSpecifier, PayloadMetadata
with raises(ValueError):
MessageDataSpecifier(-1)
with raises(ValueError):
MessageDataSpecifier(32768)
with raises(ValueError):
ServiceDataSpecifier(-1, ServiceDataSpecifier.Role.REQUEST)
with raises(ValueError):
InputSessionSpecifier(MessageDataSpecifier(123), -1)
with raises(ValueError):
OutputSessionSpecifier(ServiceDataSpecifier(100, ServiceDataSpecifier.Role.RESPONSE), None)
with raises(ValueError):
PayloadMetadata(-1, 0)
with raises(ValueError):
PayloadMetadata(2 ** 64, 0)
with raises(ValueError):
PayloadMetadata(0, -1)
# Instantiate session objects.
#
meta = PayloadMetadata(0x_bad_c0ffee_0dd_f00d, 10000)
broadcaster = tr2.get_output_session(OutputSessionSpecifier(MessageDataSpecifier(12345), None), meta)
assert broadcaster is tr2.get_output_session(OutputSessionSpecifier(MessageDataSpecifier(12345), None), meta)
subscriber_promiscuous = tr.get_input_session(InputSessionSpecifier(MessageDataSpecifier(12345), None), meta)
assert subscriber_promiscuous is tr.get_input_session(InputSessionSpecifier(MessageDataSpecifier(12345), None),
meta)
subscriber_selective = tr.get_input_session(InputSessionSpecifier(MessageDataSpecifier(12345), 123), meta)
assert subscriber_selective is tr.get_input_session(InputSessionSpecifier(MessageDataSpecifier(12345), 123), meta)
server_listener = tr.get_input_session(
InputSessionSpecifier(ServiceDataSpecifier(444, ServiceDataSpecifier.Role.REQUEST), None), meta)
assert server_listener is tr.get_input_session(
InputSessionSpecifier(ServiceDataSpecifier(444, ServiceDataSpecifier.Role.REQUEST), None), meta)
server_responder = tr.get_output_session(
OutputSessionSpecifier(ServiceDataSpecifier(444, ServiceDataSpecifier.Role.RESPONSE), 222), meta)
assert server_responder is tr.get_output_session(
OutputSessionSpecifier(ServiceDataSpecifier(444, ServiceDataSpecifier.Role.RESPONSE), 222), meta)
client_requester = tr2.get_output_session(
OutputSessionSpecifier(ServiceDataSpecifier(444, ServiceDataSpecifier.Role.REQUEST), 111), meta)
assert client_requester is tr2.get_output_session(
OutputSessionSpecifier(ServiceDataSpecifier(444, ServiceDataSpecifier.Role.REQUEST), 111), meta)
client_listener = tr2.get_input_session(
InputSessionSpecifier(ServiceDataSpecifier(444, ServiceDataSpecifier.Role.RESPONSE), 111), meta)
assert client_listener is tr2.get_input_session(
assert None is await subscriber_selective.receive_until(get_monotonic() + 0.1)
assert None is await subscriber_promiscuous.receive_until(get_monotonic() + 0.1)
assert None is await server_listener.receive_until(get_monotonic() + 0.1)
assert None is await client_listener.receive_until(get_monotonic() + 0.1)
print('tr :', tr.sample_statistics())
assert tr.sample_statistics().demultiplexer[
MessageDataSpecifier(12345)
].accepted_datagrams == {222: 1}
assert tr.sample_statistics().demultiplexer[
ServiceDataSpecifier(444, ServiceDataSpecifier.Role.REQUEST)
].accepted_datagrams == {222: 3 * 2} # Deterministic data loss mitigation is enabled, multiplication factor 2
print('tr2:', tr2.sample_statistics())
assert tr2.sample_statistics().demultiplexer[
ServiceDataSpecifier(444, ServiceDataSpecifier.Role.RESPONSE)
].accepted_datagrams == {}
#
# Termination.
#
assert set(tr.input_sessions) == {subscriber_promiscuous, subscriber_selective, server_listener}
assert set(tr.output_sessions) == {server_responder}
assert set(tr2.input_sessions) == {client_listener}
assert set(tr2.output_sessions) == {broadcaster, client_requester}
subscriber_promiscuous.close()
subscriber_promiscuous.close() # Idempotency.
assert set(tr.input_sessions) == {subscriber_selective, server_listener}
assert set(tr.output_sessions) == {server_responder}
assert set(tr2.input_sessions) == {client_listener}
async def _unittest_loopback_transport_service() -> None:
from pyuavcan.transport import ServiceDataSpecifier, InputSessionSpecifier, OutputSessionSpecifier
payload_metadata = pyuavcan.transport.PayloadMetadata(0xdeadbeef0ddf00d, 1234)
tr = pyuavcan.transport.loopback.LoopbackTransport(1234)
inp = tr.get_input_session(InputSessionSpecifier(ServiceDataSpecifier(123, ServiceDataSpecifier.Role.REQUEST),
1234),
payload_metadata)
out = tr.get_output_session(OutputSessionSpecifier(ServiceDataSpecifier(123, ServiceDataSpecifier.Role.REQUEST),
1234),
payload_metadata)
assert await out.send_until(pyuavcan.transport.Transfer(
timestamp=pyuavcan.transport.Timestamp.now(),
priority=pyuavcan.transport.Priority.IMMEDIATE,
transfer_id=123, # mod 32 = 27
fragmented_payload=[memoryview(b'Hello world!')],
), tr.loop.time() + 1.0)
assert None is not await inp.receive_until(0)
subscriber_selective = tr.get_input_session(InputSessionSpecifier(MessageDataSpecifier(2222), 123), meta)
assert subscriber_selective is tr.get_input_session(InputSessionSpecifier(MessageDataSpecifier(2222), 123), meta)
server_listener = tr.get_input_session(
InputSessionSpecifier(ServiceDataSpecifier(333, ServiceDataSpecifier.Role.REQUEST), None), meta)
assert server_listener is tr.get_input_session(
InputSessionSpecifier(ServiceDataSpecifier(333, ServiceDataSpecifier.Role.REQUEST), None), meta)
server_responder = tr.get_output_session(
OutputSessionSpecifier(ServiceDataSpecifier(333, ServiceDataSpecifier.Role.RESPONSE), 123), meta)
assert server_responder is tr.get_output_session(
OutputSessionSpecifier(ServiceDataSpecifier(333, ServiceDataSpecifier.Role.RESPONSE), 123), meta)
client_requester = tr.get_output_session(
OutputSessionSpecifier(ServiceDataSpecifier(333, ServiceDataSpecifier.Role.REQUEST), 123), meta)
assert client_requester is tr.get_output_session(
OutputSessionSpecifier(ServiceDataSpecifier(333, ServiceDataSpecifier.Role.REQUEST), 123), meta)
client_listener = tr.get_input_session(
InputSessionSpecifier(ServiceDataSpecifier(333, ServiceDataSpecifier.Role.RESPONSE), 123), meta)
assert client_listener is tr.get_input_session(
InputSessionSpecifier(ServiceDataSpecifier(333, ServiceDataSpecifier.Role.RESPONSE), 123), meta)
assert broadcaster.destination_node_id is None
assert subscriber_promiscuous.source_node_id is None
assert subscriber_selective.source_node_id == 123
assert server_listener.source_node_id is None
assert client_listener.source_node_id == 123
base_ts = time.process_time()
inputs = tr.input_sessions
)
assert tr.local_node_id is None
assert tr.protocol_parameters == tr2.protocol_parameters
assert not media.automatic_retransmission_enabled
assert not media2.automatic_retransmission_enabled
assert tr.descriptor == f'mock@{id(peers):08x}'
#
# Instantiate session objects
#
meta = PayloadMetadata(0x_bad_c0ffee_0dd_f00d, 10000)
with pytest.raises(Exception): # Can't broadcast service calls
tr.get_output_session(OutputSessionSpecifier(ServiceDataSpecifier(123, ServiceDataSpecifier.Role.RESPONSE),
None),
meta)
with pytest.raises(UnsupportedSessionConfigurationError): # Can't unicast messages
tr.get_output_session(OutputSessionSpecifier(MessageDataSpecifier(1234), 123), meta)
broadcaster = tr.get_output_session(OutputSessionSpecifier(MessageDataSpecifier(12345), None), meta)
assert broadcaster is tr.get_output_session(OutputSessionSpecifier(MessageDataSpecifier(12345), None), meta)
subscriber_promiscuous = tr.get_input_session(InputSessionSpecifier(MessageDataSpecifier(2222), None), meta)
assert subscriber_promiscuous is tr.get_input_session(InputSessionSpecifier(MessageDataSpecifier(2222), None), meta)
subscriber_selective = tr.get_input_session(InputSessionSpecifier(MessageDataSpecifier(2222), 123), meta)
assert subscriber_selective is tr.get_input_session(InputSessionSpecifier(MessageDataSpecifier(2222), 123), meta)
server_listener = tr.get_input_session(
version, int_priority, src_nid, dst_nid, int_data_spec, dt_hash, transfer_id, index_eot = \
_HEADER_WITHOUT_CRC_FORMAT.unpack_from(header)
if version != _VERSION:
return None
src_nid = None if src_nid == _ANONYMOUS_NODE_ID else src_nid
dst_nid = None if dst_nid == _ANONYMOUS_NODE_ID else dst_nid
data_specifier: pyuavcan.transport.DataSpecifier
if int_data_spec & (1 << 15) == 0:
data_specifier = pyuavcan.transport.MessageDataSpecifier(int_data_spec)
else:
if int_data_spec & (1 << 14):
role = pyuavcan.transport.ServiceDataSpecifier.Role.RESPONSE
else:
role = pyuavcan.transport.ServiceDataSpecifier.Role.REQUEST
service_id = int_data_spec & pyuavcan.transport.ServiceDataSpecifier.SERVICE_ID_MASK
data_specifier = pyuavcan.transport.ServiceDataSpecifier(service_id, role)
try:
return SerialFrame(timestamp=timestamp,
priority=pyuavcan.transport.Priority(int_priority),
source_node_id=src_nid,
destination_node_id=dst_nid,
data_specifier=data_specifier,
data_type_hash=dt_hash,
transfer_id=transfer_id,
index=index_eot & SerialFrame.INDEX_MASK,
end_of_transfer=index_eot & (1 << 31) != 0,
payload=payload)
except ValueError:
return None
The client instance will be closed automatically from its finalizer when garbage
collected if the user did not bother to do that manually.
This logic follows the RAII pattern.
See :class:`Client` for further information about clients.
"""
if not issubclass(dtype, pyuavcan.dsdl.ServiceObject):
raise TypeError(f'Not a service type: {dtype}')
self._raise_if_closed()
def transfer_id_modulo_factory() -> int:
return self._transport.protocol_parameters.transfer_id_modulo
input_session_specifier = pyuavcan.transport.InputSessionSpecifier(
pyuavcan.transport.ServiceDataSpecifier(service_id, pyuavcan.transport.ServiceDataSpecifier.Role.RESPONSE),
server_node_id
)
output_session_specifier = pyuavcan.transport.OutputSessionSpecifier(
pyuavcan.transport.ServiceDataSpecifier(service_id, pyuavcan.transport.ServiceDataSpecifier.Role.REQUEST),
server_node_id
)
try:
impl = self._registry[Client, input_session_specifier]
assert isinstance(impl, ClientImpl)
except LookupError:
output_transport_session = self._transport.get_output_session(output_session_specifier,
self._make_payload_metadata(dtype.Request))
input_transport_session = self._transport.get_input_session(input_session_specifier,
self._make_payload_metadata(dtype.Response))
transfer_id_counter = self._output_transfer_id_map.setdefault(output_session_specifier,
OutgoingTransferIDCounter())
async def do_send(frames: typing.Iterable[SerialFrame], monotonic_deadline: float) -> typing.Optional[Timestamp]:
nonlocal last_sent_frames
nonlocal last_monotonic_deadline
last_sent_frames = list(frames)
last_monotonic_deadline = monotonic_deadline
if tx_exception:
raise tx_exception
return tx_timestamp
def do_finalize() -> None:
nonlocal finalized
finalized = True
with raises(pyuavcan.transport.UnsupportedSessionConfigurationError):
_ = SerialOutputSession(
specifier=SessionSpecifier(ServiceDataSpecifier(321, ServiceDataSpecifier.Role.RESPONSE), None),
payload_metadata=PayloadMetadata(0xdeadbeefbadc0ffe, 1024),
sft_payload_capacity_bytes=10,
local_node_id_accessor=lambda: 1234, # pragma: no cover
send_handler=do_send,
finalizer=do_finalize,
)
sos = SerialOutputSession(
specifier=SessionSpecifier(ServiceDataSpecifier(321, ServiceDataSpecifier.Role.REQUEST), 1111),
payload_metadata=PayloadMetadata(0xdeadbeefbadc0ffe, 1024),
sft_payload_capacity_bytes=10,
local_node_id_accessor=lambda: None, # pragma: no cover
send_handler=do_send,
finalizer=do_finalize,
)
if source_node_id is None:
# Anonymous transfer - no reconstruction needed
self._statistics.transfers += 1
self._statistics.payload_bytes += len(frame.padded_payload)
out = pyuavcan.transport.TransferFrom(timestamp=frame.timestamp,
priority=canid.priority,
transfer_id=frame.transfer_id,
fragmented_payload=[frame.padded_payload],
source_node_id=None)
_logger.debug('%s: Received anonymous transfer: %s; current stats: %s', self, out, self._statistics)
return out
elif isinstance(canid, _identifier.ServiceCANID):
assert isinstance(self._specifier.data_specifier, pyuavcan.transport.ServiceDataSpecifier)
assert self._specifier.data_specifier.service_id == canid.service_id
assert (self._specifier.data_specifier.role == pyuavcan.transport.ServiceDataSpecifier.Role.REQUEST) \
== canid.request_not_response
source_node_id = canid.source_node_id
else:
assert False
receiver = self._receivers[source_node_id]
result = receiver.process_frame(canid.priority, frame, self._transfer_id_timeout_ns)
if isinstance(result, _transfer_reassembler.TransferReassemblyErrorID):
self._statistics.errors += 1
self._statistics.reception_error_counters[result] += 1
_logger.debug('%s: Rejecting CAN frame %s because %s; current stats: %s',
self, frame, result, self._statistics)
elif isinstance(result, pyuavcan.transport.TransferFrom):
self._statistics.transfers += 1
self._statistics.payload_bytes += sum(map(len, result.fragmented_payload))