Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
pub_record = pres_b.make_publisher_with_fixed_subject_id(uavcan.diagnostic.Record_1_0)
sub_record = pres_a.make_subscriber_with_fixed_subject_id(uavcan.diagnostic.Record_1_0)
sub_record2 = pres_a.make_subscriber_with_fixed_subject_id(uavcan.diagnostic.Record_1_0)
heart = uavcan.node.Heartbeat_1_0(uptime=123456,
health=uavcan.node.Heartbeat_1_0.HEALTH_CAUTION,
mode=uavcan.node.Heartbeat_1_0.MODE_OPERATIONAL,
vendor_specific_status_code=0xc0fe)
pub_heart.transfer_id_counter.override(23)
await pub_heart.publish(heart)
rx, transfer = await sub_heart.receive() # type: typing.Any, pyuavcan.transport.TransferFrom
assert repr(rx) == repr(heart)
assert transfer.source_node_id == 123
assert transfer.priority == Priority.NOMINAL
assert transfer.transfer_id == 23
stat = sub_heart.sample_statistics()
assert stat.transport_session.transfers == 1
assert stat.transport_session.frames >= 1 # 'greater' is needed to accommodate redundant transports.
assert stat.transport_session.drops == 0
assert stat.deserialization_failures == 0
assert stat.messages == 1
await pub_heart.publish(heart)
rx = (await sub_heart.receive())[0]
assert repr(rx) == repr(heart)
await pub_heart.publish(heart)
rx = (await sub_heart.receive_until(asyncio.get_event_loop().time() + _RX_TIMEOUT))[0] # type: ignore
assert repr(rx) == repr(heart)
#
# Message exchange test.
#
assert await broadcaster.send_until(
Transfer(timestamp=Timestamp.now(),
priority=Priority.LOW,
transfer_id=77777,
fragmented_payload=payload_single),
monotonic_deadline=get_monotonic() + 5.0
)
rx_transfer = await subscriber_promiscuous.receive_until(get_monotonic() + 5.0)
print('PROMISCUOUS SUBSCRIBER TRANSFER:', rx_transfer)
assert isinstance(rx_transfer, TransferFrom)
assert rx_transfer.priority == Priority.LOW
assert rx_transfer.transfer_id == 77777
assert rx_transfer.fragmented_payload == [b''.join(payload_single)]
print(tr.sample_statistics())
assert tr.sample_statistics().in_bytes >= 32 + sft_capacity + 2
assert tr.sample_statistics().in_frames == 1
assert tr.sample_statistics().in_out_of_band_bytes == 0
assert tr.sample_statistics().out_bytes == tr.sample_statistics().in_bytes
assert tr.sample_statistics().out_frames == 1
assert tr.sample_statistics().out_transfers == 1
assert tr.sample_statistics().out_incomplete == 0
with pytest.raises(pyuavcan.transport.OperationNotDefinedForAnonymousNodeError):
# Anonymous nodes can't send multiframe transfers.
assert await broadcaster.send_until(
Transfer(timestamp=Timestamp.now(),
identifier=ServiceCANID(priority=Priority.FAST,
source_node_id=5,
destination_node_id=123,
service_id=333,
request_not_response=True).compile([_mem('Ignored')]),
padded_payload=_mem('Ignored'),
start_of_transfer=True,
end_of_transfer=False,
toggle_bit=True,
transfer_id=12,
loopback=True).compile()
])
client_requester.close()
with pytest.raises(ResourceClosedError):
assert await client_requester.send_until(Transfer(timestamp=ts, priority=Priority.LOW, transfer_id=4,
fragmented_payload=[]),
tr.loop.time() + 1.0)
fb = feedback_collector.take()
assert fb.original_transfer_timestamp == ts
validate_timestamp(fb.first_frame_transmission_timestamp)
received = await promiscuous_server_s333.receive_until(tr.loop.time() + 1.0)
assert received is not None
assert isinstance(received, TransferFrom)
assert received.source_node_id == 5
assert received.transfer_id == 12
assert received.priority == Priority.FAST
validate_timestamp(received.timestamp)
assert len(received.fragmented_payload) == 7 # Equals the number of frames
assert sum(map(len, received.fragmented_payload)) == 438 + 1 # Padding also included
)
rx = await sub_any_a.receive_until(loop.time() + 1.0)
assert rx is not None
assert rx.fragmented_payload == [memoryview(b'def')]
assert rx.transfer_id == 2
assert not await sub_any_b.receive_until(loop.time() + 0.1)
#
# Incapacitate one inferior, ensure things are still OK.
#
for s in lo_mono_0.output_sessions:
s.exception = RuntimeError('EXCEPTION BLIN')
assert await pub_a.send_until(
Transfer(timestamp=Timestamp.now(),
priority=Priority.LOW,
transfer_id=3,
fragmented_payload=[memoryview(b'qwe')]),
monotonic_deadline=loop.time() + 1.0
)
rx = await sub_any_a.receive_until(loop.time() + 1.0)
assert rx is not None
assert rx.fragmented_payload == [memoryview(b'qwe')]
assert rx.transfer_id == 3
#
# Remove old loopback transports. Configure new ones with cyclic TID.
#
lo_cyc_0 = LoopbackTransport(111)
lo_cyc_1 = LoopbackTransport(111)
cyc_proto_params = ProtocolParameters(
transfer_id_modulo=32, # Like CAN
#
from __future__ import annotations
import random
import typing
import dataclasses
import pyuavcan.transport
from . import _filter
@dataclasses.dataclass(frozen=True)
class CANIdentifier:
PRIORITY_MASK = 7
NODE_ID_MASK = 127
priority: pyuavcan.transport.Priority
def compile(self) -> int:
raise NotImplementedError
@staticmethod
def parse(identifier: int) -> CANIdentifier:
_validate_unsigned_range(identifier, 2 ** 29 - 1)
priority = pyuavcan.transport.Priority(identifier >> 26)
source_node_id = (identifier >> 1) & CANIdentifier.NODE_ID_MASK
service_not_message = identifier & (1 << 25) != 0
if service_not_message:
spec: CANIdentifier = ServiceCANIdentifier(
priority=priority,
service_id=(identifier >> 15) & ServiceCANIdentifier.SERVICE_ID_MASK,
request_not_response=identifier & (1 << 24) != 0,
source_node_id=source_node_id,
source_node_id=SerialFrame.FRAME_DELIMITER_BYTE,
destination_node_id=SerialFrame.ESCAPE_PREFIX_BYTE,
data_specifier=MessageDataSpecifier(12345),
data_type_hash=0xdead_beef_bad_c0ffe,
transfer_id=1234567890123456789,
index=1234567,
end_of_transfer=True,
payload=memoryview(b'ab\x9E\x8E')) # 4 bytes of payload.
result = proc(f1.compile_into(bytearray(100)))
assert len(result) == 1
assert isinstance(result[0], SerialFrame)
assert SerialFrame.__eq__(f1, result)
# Second valid frame is too long.
f2 = SerialFrame(timestamp=ts,
priority=Priority.HIGH,
source_node_id=SerialFrame.FRAME_DELIMITER_BYTE,
destination_node_id=SerialFrame.ESCAPE_PREFIX_BYTE,
data_specifier=MessageDataSpecifier(12345),
data_type_hash=0xdead_beef_bad_c0ffe,
transfer_id=1234567890123456789,
index=1234567,
end_of_transfer=True,
payload=f1.compile_into(bytearray(1000)))
assert len(f2.payload) == 46 + 2 # The extra two are escapes.
result = proc(f2.compile_into(bytearray(1000)))
assert len(result) == 1
assert isinstance(result[0], memoryview)
# Create new instance with much larger frame size limit; feed both frames but let the first one be incomplete.
sp = StreamParser(outputs.append, 10**6)
assert [] == proc(f1.compile_into(bytearray(100))[:-2]) # First one is ended abruptly.
def priority(self, value: pyuavcan.transport.Priority) -> None:
assert value in pyuavcan.transport.Priority
self._priority = value
from pyuavcan.transport import Priority, SessionSpecifier, MessageDataSpecifier, ServiceDataSpecifier
assert [
Frame(
priority=Priority.OPTIONAL,
source_node_id=1234,
destination_node_id=None,
data_specifier=MessageDataSpecifier(4321),
data_type_hash=0xdead_beef_0dd_c0ffe,
transfer_id=12345678901234567890,
frame_index=0,
end_of_transfer=True,
payload=memoryview(b'hello world'),
),
] == list(serialize_transfer(
priority=Priority.OPTIONAL,
local_node_id=1234,
session_specifier=SessionSpecifier(MessageDataSpecifier(4321), None),
data_type_hash=0xdead_beef_0dd_c0ffe,
transfer_id=12345678901234567890,
fragmented_payload=[memoryview(b'hello'), memoryview(b' '), memoryview(b'world')],
max_frame_payload_bytes=100,
))
assert [
Frame(
priority=Priority.OPTIONAL,
source_node_id=1234,
destination_node_id=None,
data_specifier=MessageDataSpecifier(4321),
data_type_hash=0xdead_beef_0dd_c0ffe,
transfer_id=12345678901234567890,
def parse(identifier: int) -> CANIdentifier:
_validate_unsigned_range(identifier, 2 ** 29 - 1)
priority = pyuavcan.transport.Priority(identifier >> 26)
source_node_id = (identifier >> 1) & CANIdentifier.NODE_ID_MASK
service_not_message = identifier & (1 << 25) != 0
if service_not_message:
spec: CANIdentifier = ServiceCANIdentifier(
priority=priority,
service_id=(identifier >> 15) & ServiceCANIdentifier.SERVICE_ID_MASK,
request_not_response=identifier & (1 << 24) != 0,
source_node_id=source_node_id,
destination_node_id=(identifier >> 8) & CANIdentifier.NODE_ID_MASK
)
else:
anonymous = identifier & (1 << 24) != 0
spec = MessageCANIdentifier(
priority=priority,
subject_id=(identifier >> 8) & MessageCANIdentifier.SUBJECT_ID_MASK,
source_node_id=None if anonymous else source_node_id
_CANID_EXT_MASK = 2 ** 29 - 1
_BIT_SRV_NOT_MSG = 1 << 25
_BIT_MSG_ANON = 1 << 24
_BIT_SRV_REQ = 1 << 24
_BIT_R23 = 1 << 23
_BIT_MSG_R7 = 1 << 7
@dataclasses.dataclass(frozen=True)
class CANID:
PRIORITY_MASK = 7
NODE_ID_MASK = 127
priority: pyuavcan.transport.Priority
source_node_id: typing.Optional[int] # None if anonymous; may be non-optional in derived classes
def __post_init__(self) -> None:
assert isinstance(self.priority, pyuavcan.transport.Priority)
def compile(self, fragmented_transfer_payload: typing.Iterable[memoryview]) -> int:
# You might be wondering, why the hell would a CAN ID abstraction depend on the payload of the transfer?
# This is to accommodate the special case of anonymous message transfers. We need to know the payload to
# compute the pseudo node ID when emitting anonymous messages. We could use just random numbers from the
# standard library, but that would make the code hard to test.
raise NotImplementedError
@property
def data_specifier(self) -> pyuavcan.transport.DataSpecifier:
raise NotImplementedError