Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
sft_capacity = 1024
payload_single = [_mem('qwertyui'), _mem('01234567')] * (sft_capacity // 16)
assert sum(map(len, payload_single)) == sft_capacity
payload_x3 = (payload_single * 3)[:-1]
payload_x3_size_bytes = sft_capacity * 3 - 8
assert sum(map(len, payload_x3)) == payload_x3_size_bytes
#
# Instantiate session objects.
#
meta = PayloadMetadata(0x_bad_c0ffee_0dd_f00d, 10000)
broadcaster = tr.get_output_session(OutputSessionSpecifier(MessageDataSpecifier(12345), None), meta)
assert broadcaster is tr.get_output_session(OutputSessionSpecifier(MessageDataSpecifier(12345), None), meta)
subscriber_promiscuous = tr.get_input_session(InputSessionSpecifier(MessageDataSpecifier(12345), None), meta)
assert subscriber_promiscuous is tr.get_input_session(InputSessionSpecifier(MessageDataSpecifier(12345), None),
meta)
subscriber_selective = tr.get_input_session(InputSessionSpecifier(MessageDataSpecifier(12345), 3210), meta)
assert subscriber_selective is tr.get_input_session(InputSessionSpecifier(MessageDataSpecifier(12345), 3210), meta)
server_listener = tr.get_input_session(
InputSessionSpecifier(ServiceDataSpecifier(333, ServiceDataSpecifier.Role.REQUEST), None), meta)
assert server_listener is tr.get_input_session(
InputSessionSpecifier(ServiceDataSpecifier(333, ServiceDataSpecifier.Role.REQUEST), None), meta)
client_requester = tr.get_output_session(
OutputSessionSpecifier(ServiceDataSpecifier(333, ServiceDataSpecifier.Role.REQUEST), 3210), meta)
class Allocatee:
"""
Plug-and-play node-ID protocol client.
This class represents a node that requires an allocated node-ID.
Once started, the client will keep issuing node-ID allocation requests until either a node-ID is granted
or until the node-ID of the underlying transport instance ceases to be anonymous (that could happen if the
transport is re-configured externally).
The status (whether the allocation is finished or still in progress) is to be queried periodically
via the method :meth:`get_result`.
Uses v1 allocation messages if the transport MTU is small (like if the transport is Classic CAN).
Switches between v1 and v2 as necessary on the fly if the transport is reconfigured at runtime.
"""
DEFAULT_PRIORITY = pyuavcan.transport.Priority.SLOW
_MTU_THRESHOLD = max(pyuavcan.dsdl.get_model(NodeIDAllocationData_2).bit_length_set) // 8
def __init__(self,
presentation: pyuavcan.presentation.Presentation,
local_unique_id: bytes,
preferred_node_id: typing.Optional[int] = None):
"""
:param presentation: The presentation instance to use. If the underlying transport is not anonymous
(i.e., a node-ID is already set), the allocatee will simply return the existing node-ID and do nothing.
:param local_unique_id: The 128-bit globally unique-ID of the local node; the same value is also contained
in the ``uavcan.node.GetInfo.Response``. Beware that random generation of the unique-ID at every launch
is a bad idea because it will exhaust the allocation table quickly. Refer to the Specification for details.
:param preferred_node_id: If the application prefers to obtain a particular node-ID, it can specify it here.
def __init__(self) -> None:
self._item: typing.Optional[pyuavcan.transport.Feedback] = None
assert stat.messages == 1
await pub_heart.publish(heart)
rx = (await sub_heart.receive())[0]
assert repr(rx) == repr(heart)
await pub_heart.publish(heart)
rx = (await sub_heart.receive_until(asyncio.get_event_loop().time() + _RX_TIMEOUT))[0] # type: ignore
assert repr(rx) == repr(heart)
rx = await sub_heart.receive_for(_RX_TIMEOUT)
assert rx is None
sub_heart.close()
sub_heart.close() # Shall not raise.
record_handler_output: typing.List[typing.Tuple[uavcan.diagnostic.Record_1_0, pyuavcan.transport.TransferFrom]] = []
async def record_handler(message: uavcan.diagnostic.Record_1_0,
cb_transfer: pyuavcan.transport.TransferFrom) -> None:
print('RECORD HANDLER:', message, cb_transfer)
record_handler_output.append((message, cb_transfer))
sub_record2.receive_in_background(record_handler)
record = uavcan.diagnostic.Record_1_0(timestamp=uavcan.time.SynchronizedTimestamp_1_0(1234567890),
severity=uavcan.diagnostic.Severity_1_0(uavcan.diagnostic.Severity_1_0.ALERT),
text='Hello world!')
assert pub_record.priority == pyuavcan.presentation.DEFAULT_PRIORITY
pub_record.priority = Priority.NOMINAL
assert pub_record.priority == Priority.NOMINAL
with pytest.raises(TypeError, match='.*Heartbeat.*'):
# noinspection PyTypeChecker
def _unittest_transport_primitives() -> None:
from pytest import raises
from pyuavcan.transport import InputSessionSpecifier, OutputSessionSpecifier
from pyuavcan.transport import MessageDataSpecifier, ServiceDataSpecifier, PayloadMetadata
with raises(ValueError):
MessageDataSpecifier(-1)
with raises(ValueError):
MessageDataSpecifier(32768)
with raises(ValueError):
ServiceDataSpecifier(-1, ServiceDataSpecifier.Role.REQUEST)
with raises(ValueError):
InputSessionSpecifier(MessageDataSpecifier(123), -1)
with raises(ValueError):
OutputSessionSpecifier(ServiceDataSpecifier(100, ServiceDataSpecifier.Role.RESPONSE), None)
with raises(ValueError):
PayloadMetadata(-1, 0)
with raises(ValueError):
PayloadMetadata(2 ** 64, 0)
with raises(ValueError):
PayloadMetadata(0, -1)
print('PROMISCUOUS SUBSCRIBER TRANSFER:', rx_transfer)
assert isinstance(rx_transfer, TransferFrom)
assert rx_transfer.priority == Priority.LOW
assert rx_transfer.transfer_id == 77777
assert rx_transfer.fragmented_payload == [b''.join(payload_single)]
print('tr :', tr.sample_statistics())
assert tr.sample_statistics().demultiplexer[
MessageDataSpecifier(12345)
].accepted_datagrams == {222: 1}
assert tr.sample_statistics().demultiplexer[
ServiceDataSpecifier(444, ServiceDataSpecifier.Role.REQUEST)
].accepted_datagrams == {}
print('tr2:', tr2.sample_statistics())
assert tr2.sample_statistics().demultiplexer[
ServiceDataSpecifier(444, ServiceDataSpecifier.Role.RESPONSE)
].accepted_datagrams == {}
assert None is await subscriber_selective.receive_until(get_monotonic() + 0.1)
assert None is await subscriber_promiscuous.receive_until(get_monotonic() + 0.1)
assert None is await server_listener.receive_until(get_monotonic() + 0.1)
assert None is await client_listener.receive_until(get_monotonic() + 0.1)
#
# Service exchange test.
#
assert await client_requester.send_until(
Transfer(timestamp=Timestamp.now(),
priority=Priority.HIGH,
transfer_id=88888,
fragmented_payload=payload_x3),
monotonic_deadline=get_monotonic() + 5.0
# Instantiate session objects.
#
meta = PayloadMetadata(0x_bad_c0ffee_0dd_f00d, 10000)
broadcaster = tr2.get_output_session(OutputSessionSpecifier(MessageDataSpecifier(12345), None), meta)
assert broadcaster is tr2.get_output_session(OutputSessionSpecifier(MessageDataSpecifier(12345), None), meta)
subscriber_promiscuous = tr.get_input_session(InputSessionSpecifier(MessageDataSpecifier(12345), None), meta)
assert subscriber_promiscuous is tr.get_input_session(InputSessionSpecifier(MessageDataSpecifier(12345), None),
meta)
subscriber_selective = tr.get_input_session(InputSessionSpecifier(MessageDataSpecifier(12345), 123), meta)
assert subscriber_selective is tr.get_input_session(InputSessionSpecifier(MessageDataSpecifier(12345), 123), meta)
server_listener = tr.get_input_session(
InputSessionSpecifier(ServiceDataSpecifier(444, ServiceDataSpecifier.Role.REQUEST), None), meta)
assert server_listener is tr.get_input_session(
InputSessionSpecifier(ServiceDataSpecifier(444, ServiceDataSpecifier.Role.REQUEST), None), meta)
server_responder = tr.get_output_session(
OutputSessionSpecifier(ServiceDataSpecifier(444, ServiceDataSpecifier.Role.RESPONSE), 222), meta)
assert server_responder is tr.get_output_session(
OutputSessionSpecifier(ServiceDataSpecifier(444, ServiceDataSpecifier.Role.RESPONSE), 222), meta)
client_requester = tr2.get_output_session(
OutputSessionSpecifier(ServiceDataSpecifier(444, ServiceDataSpecifier.Role.REQUEST), 111), meta)
assert client_requester is tr2.get_output_session(
OutputSessionSpecifier(ServiceDataSpecifier(444, ServiceDataSpecifier.Role.REQUEST), 111), meta)
client_listener = tr2.get_input_session(
InputSessionSpecifier(ServiceDataSpecifier(444, ServiceDataSpecifier.Role.RESPONSE), 111), meta)
assert client_listener is tr2.get_input_session(
# Instantiate session objects.
#
meta = PayloadMetadata(0x_bad_c0ffee_0dd_f00d, 10000)
broadcaster = tr.get_output_session(OutputSessionSpecifier(MessageDataSpecifier(12345), None), meta)
assert broadcaster is tr.get_output_session(OutputSessionSpecifier(MessageDataSpecifier(12345), None), meta)
subscriber_promiscuous = tr.get_input_session(InputSessionSpecifier(MessageDataSpecifier(12345), None), meta)
assert subscriber_promiscuous is tr.get_input_session(InputSessionSpecifier(MessageDataSpecifier(12345), None),
meta)
subscriber_selective = tr.get_input_session(InputSessionSpecifier(MessageDataSpecifier(12345), 3210), meta)
assert subscriber_selective is tr.get_input_session(InputSessionSpecifier(MessageDataSpecifier(12345), 3210), meta)
server_listener = tr.get_input_session(
InputSessionSpecifier(ServiceDataSpecifier(333, ServiceDataSpecifier.Role.REQUEST), None), meta)
assert server_listener is tr.get_input_session(
InputSessionSpecifier(ServiceDataSpecifier(333, ServiceDataSpecifier.Role.REQUEST), None), meta)
client_requester = tr.get_output_session(
OutputSessionSpecifier(ServiceDataSpecifier(333, ServiceDataSpecifier.Role.REQUEST), 3210), meta)
assert client_requester is tr.get_output_session(
OutputSessionSpecifier(ServiceDataSpecifier(333, ServiceDataSpecifier.Role.REQUEST), 3210), meta)
client_listener = tr.get_input_session(
InputSessionSpecifier(ServiceDataSpecifier(333, ServiceDataSpecifier.Role.RESPONSE), 3210), meta)
assert client_listener is tr.get_input_session(
InputSessionSpecifier(ServiceDataSpecifier(333, ServiceDataSpecifier.Role.RESPONSE), 3210), meta)
print('INPUTS:', tr.input_sessions)
print('OUTPUTS:', tr.output_sessions)
assert set(tr.input_sessions) == {subscriber_promiscuous, subscriber_selective, server_listener, client_listener}
assert rx_transfer.transfer_id == 77777
assert rx_transfer.fragmented_payload == [b''.join(payload_single)]
print(tr.sample_statistics())
assert tr.sample_statistics().in_bytes >= 32 + sft_capacity + 2
assert tr.sample_statistics().in_frames == 1
assert tr.sample_statistics().in_out_of_band_bytes == 0
assert tr.sample_statistics().out_bytes == tr.sample_statistics().in_bytes
assert tr.sample_statistics().out_frames == 1
assert tr.sample_statistics().out_transfers == 1
assert tr.sample_statistics().out_incomplete == 0
with pytest.raises(pyuavcan.transport.OperationNotDefinedForAnonymousNodeError):
# Anonymous nodes can't send multiframe transfers.
assert await broadcaster.send_until(
Transfer(timestamp=Timestamp.now(),
priority=Priority.LOW,
transfer_id=77777,
fragmented_payload=payload_x3),
monotonic_deadline=get_monotonic() + 5.0
)
assert None is await subscriber_selective.receive_until(get_monotonic() + 0.1)
assert None is await subscriber_promiscuous.receive_until(get_monotonic() + 0.1)
assert None is await server_listener.receive_until(get_monotonic() + 0.1)
assert None is await client_listener.receive_until(get_monotonic() + 0.1)
#
# Service exchange test.
#
with pytest.raises(pyuavcan.transport.OperationNotDefinedForAnonymousNodeError):
# Anonymous nodes can't emit service transfers.
await media_a.send_until([
DataFrame(identifier=0xbadc0fe,
data=bytearray(range(8)),
format=FrameFormat.EXTENDED,
loopback=True),
DataFrame(identifier=0x12345678,
data=bytearray(range(0)),
format=FrameFormat.EXTENDED,
loopback=False),
DataFrame(identifier=0x123,
data=bytearray(range(6)),
format=FrameFormat.BASE,
loopback=True),
], asyncio.get_event_loop().time() + 1.0)
await asyncio.sleep(0.1)
ts_end = Timestamp.now()
print('rx_a:', rx_a)
# Three sent back from the other end, two loopback
assert len(rx_a) == 5
for f in rx_a:
assert ts_begin.monotonic_ns <= f.timestamp.monotonic_ns <= ts_end.monotonic_ns
assert ts_begin.system_ns <= f.timestamp.system_ns <= ts_end.system_ns
rx_loopback = list(filter(lambda x: x.loopback, rx_a))
rx_external = list(filter(lambda x: not x.loopback, rx_a))
assert len(rx_loopback) == 2 and len(rx_external) == 3
assert rx_loopback[0].identifier == 0xbadc0fe
assert rx_loopback[0].data == bytearray(range(8))
assert rx_loopback[0].format == FrameFormat.EXTENDED