Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
assert tr.sample_statistics().demultiplexer[
MessageDataSpecifier(12345)
].accepted_datagrams == {}
assert tr.sample_statistics().demultiplexer[
ServiceDataSpecifier(444, ServiceDataSpecifier.Role.REQUEST)
].accepted_datagrams == {}
assert tr2.sample_statistics().demultiplexer[
ServiceDataSpecifier(444, ServiceDataSpecifier.Role.RESPONSE)
].accepted_datagrams == {}
#
# Message exchange test.
#
assert await broadcaster.send_until(
Transfer(timestamp=Timestamp.now(),
priority=Priority.LOW,
transfer_id=77777,
fragmented_payload=payload_single),
monotonic_deadline=get_monotonic() + 5.0
)
rx_transfer = await subscriber_promiscuous.receive_until(get_monotonic() + 5.0)
print('PROMISCUOUS SUBSCRIBER TRANSFER:', rx_transfer)
assert isinstance(rx_transfer, TransferFrom)
assert rx_transfer.priority == Priority.LOW
assert rx_transfer.transfer_id == 77777
assert rx_transfer.fragmented_payload == [b''.join(payload_single)]
print('tr :', tr.sample_statistics())
assert tr.sample_statistics().demultiplexer[
MessageDataSpecifier(12345)
rx, transfer = await sub_record.receive()
assert repr(rx) == repr(record)
assert transfer.source_node_id == 42
assert transfer.priority == Priority.NOMINAL
assert transfer.transfer_id == 0
# Broken transfer
stat = sub_record.sample_statistics()
assert stat.transport_session.transfers == 1
assert stat.transport_session.frames >= 1 # 'greater' is needed to accommodate redundant transports.
assert stat.transport_session.drops == 0
assert stat.deserialization_failures == 0
assert stat.messages == 1
await pub_record.transport_session.send_until(pyuavcan.transport.Transfer(
timestamp=pyuavcan.transport.Timestamp.now(),
priority=Priority.NOMINAL,
transfer_id=12,
fragmented_payload=[memoryview(b'\xFF' * 15)], # Array length prefix is too long
), tran_a.loop.time() + 1.0)
assert (await sub_record.receive_until(asyncio.get_event_loop().time() + _RX_TIMEOUT)) is None
stat = sub_record.sample_statistics()
assert stat.transport_session.transfers == 2
assert stat.transport_session.frames >= 2 # 'greater' is needed to accommodate redundant transports.
assert stat.transport_session.drops == 0
assert stat.deserialization_failures == 1
assert stat.messages == 1
# Close the objects explicitly and ensure that they are finalized. This also removes the warnings that some tasks
# have been removed while pending.
pub_heart.close()
# Author: Pavel Kirienko
#
from __future__ import annotations
import dataclasses
import pyuavcan
@dataclasses.dataclass(frozen=True)
class Frame:
"""
The base class of a high-overhead-transport frame.
It is used with the common transport algorithms defined in this module.
Concrete transport implementations should make their transport-specific frame dataclasses inherit from this class.
"""
timestamp: pyuavcan.transport.Timestamp
"""
For outgoing frames, this is the timestamp of the transfer instance.
For incoming frames, this is the reception timestamp from the media implementation (hardware or software).
"""
priority: pyuavcan.transport.Priority
"""
Transfer priority should be the same for all frames within the transfer.
"""
transfer_id: int
"""
Transfer-ID is incremented whenever a transfer under a specific session-specifier is emitted.
Always non-negative.
"""
def __post_init__(self) -> None:
if not isinstance(self.timestamp, pyuavcan.transport.Timestamp):
raise TypeError(f'Invalid timestamp: {self.timestamp}')
if not isinstance(self.priority, pyuavcan.transport.Priority):
raise TypeError(f'Invalid priority: {self.priority}')
if self.transfer_id < 0:
raise ValueError(f'Invalid transfer-ID: {self.transfer_id}')
if self.index < 0:
raise ValueError(f'Invalid frame index: {self.index}')
if not isinstance(self.end_of_transfer, bool):
raise TypeError(f'Bad end of transfer flag: {type(self.end_of_transfer).__name__}')
if not isinstance(self.payload, memoryview):
raise TypeError(f'Bad payload type: {type(self.payload).__name__}')
def _unittest_output_session() -> None:
import asyncio
from pytest import raises, approx
from pyuavcan.transport import SessionSpecifier, MessageDataSpecifier, ServiceDataSpecifier, Priority, Transfer
from pyuavcan.transport import PayloadMetadata, SessionStatistics, Timestamp, Feedback
ts = Timestamp.now()
run_until_complete = asyncio.get_event_loop().run_until_complete
tx_timestamp: typing.Optional[Timestamp] = Timestamp.now()
tx_exception: typing.Optional[Exception] = None
last_sent_frames: typing.List[SerialFrame] = []
last_monotonic_deadline = 0.0
finalized = False
async def do_send(frames: typing.Iterable[SerialFrame], monotonic_deadline: float) -> typing.Optional[Timestamp]:
nonlocal last_sent_frames
nonlocal last_monotonic_deadline
last_sent_frames = list(frames)
last_monotonic_deadline = monotonic_deadline
if tx_exception:
raise tx_exception
return tx_timestamp
def do_finalize() -> None:
nonlocal finalized
def _unittest_output_session() -> None:
from pytest import raises
from pyuavcan.transport import OutputSessionSpecifier, MessageDataSpecifier, ServiceDataSpecifier, Priority
from pyuavcan.transport import PayloadMetadata, SessionStatistics, Timestamp, Feedback, Transfer
ts = Timestamp.now()
loop = asyncio.get_event_loop()
run_until_complete = loop.run_until_complete
finalized = False
def do_finalize() -> None:
nonlocal finalized
finalized = True
def check_timestamp(t: pyuavcan.transport.Timestamp) -> bool:
now = pyuavcan.transport.Timestamp.now()
s = ts.system_ns <= t.system_ns <= now.system_ns
m = ts.monotonic_ns <= t.monotonic_ns <= now.system_ns
return s and m
destination_endpoint = '127.100.0.1', 25406
if not iface_switch_allowed and state.iface_index != iface_index:
return False
# TODO: The TID modulo setting is not currently used yet.
# TODO: It may be utilized later to implement faster iface fallback.
# Either we're on the same interface or (the interface is new and the current one seems to be down).
state.iface_index = iface_index
state.last_timestamp = transfer.timestamp
return True
@dataclasses.dataclass
class _RemoteState:
iface_index: int
last_timestamp: pyuavcan.transport.Timestamp
data, ancdata, msg_flags, _addr = self._sock.recvmsg(self._native_frame_size,
self._ancillary_data_buffer_size)
assert msg_flags & socket.MSG_TRUNC == 0, 'The data buffer is not large enough'
assert msg_flags & socket.MSG_CTRUNC == 0, 'The ancillary data buffer is not large enough'
loopback = bool(msg_flags & socket.MSG_CONFIRM)
ts_system_ns = 0
for cmsg_level, cmsg_type, cmsg_data in ancdata:
if cmsg_level == socket.SOL_SOCKET and cmsg_type == _SO_TIMESTAMP:
sec, usec = _TIMEVAL_STRUCT.unpack(cmsg_data)
ts_system_ns = (sec * 1_000_000 + usec) * 1000
else:
assert False, f'Unexpected ancillary data: {cmsg_level}, {cmsg_type}, {cmsg_data!r}'
assert ts_system_ns > 0, 'Missing the timestamp; does the driver support timestamping?'
timestamp = pyuavcan.transport.Timestamp(system_ns=ts_system_ns, monotonic_ns=ts_mono_ns)
out = SocketCANMedia._parse_native_frame(data, loopback=loopback, timestamp=timestamp)
if out is not None:
return out
data = bytearray(self.padded_payload)
data.append(tail)
return _media.DataFrame(identifier=self.identifier,
data=data,
format=_media.FrameFormat.EXTENDED,
loopback=self.loopback)
@staticmethod
def get_required_padding(data_length: int) -> int:
return _media.DataFrame.get_required_padding(data_length + 1) # +1 for the tail byte
@dataclasses.dataclass(frozen=True)
class TimestampedUAVCANFrame(UAVCANFrame):
timestamp: pyuavcan.transport.Timestamp
@staticmethod
def parse(source: _media.TimestampedDataFrame) -> typing.Optional[TimestampedUAVCANFrame]:
if source.format != _media.FrameFormat.EXTENDED:
return None
if len(source.data) < 1:
return None
padded_payload, tail = memoryview(source.data)[:-1], source.data[-1]
transfer_id = tail & (TRANSFER_ID_MODULO - 1)
sot, eot, tog = tuple(tail & (1 << x) != 0 for x in (7, 6, 5))
if sot and not tog:
return None
return TimestampedUAVCANFrame(timestamp=source.timestamp,
ses._add_inferior(inf_a) # No change, added above
assert ses.inferiors == [inf_a]
# noinspection PyProtectedMember
ses._add_inferior(inf_b)
assert ses.inferiors == [inf_a, inf_b]
assert ses.transfer_id_timeout == pytest.approx(1.1)
assert inf_b.transfer_id_timeout == pytest.approx(1.1)
# Redundant reception - new transfers accepted because the iface switch timeout is exceeded.
time.sleep(ses.transfer_id_timeout) # Just to make sure that it is REALLY exceeded.
assert await_(tx_b.send_until(Transfer(timestamp=Timestamp.now(),
priority=Priority.HIGH,
transfer_id=2,
fragmented_payload=[memoryview(b'def')]),
loop.time() + 1.0))
assert await_(tx_b.send_until(Transfer(timestamp=Timestamp.now(),
priority=Priority.HIGH,
transfer_id=3,
fragmented_payload=[memoryview(b'ghi')]),
loop.time() + 1.0))
tr = await_(ses.receive_until(loop.time() + 0.1))
assert isinstance(tr, RedundantTransferFrom)
assert ts.monotonic <= tr.timestamp.monotonic <= (loop.time() + 1e-3)
assert tr.priority == Priority.HIGH
assert tr.transfer_id == 2
assert tr.fragmented_payload == [memoryview(b'def')]
assert tr.inferior_session == inf_b
tr = await_(ses.receive_until(loop.time() + 0.1))
assert isinstance(tr, RedundantTransferFrom)
assert ts.monotonic <= tr.timestamp.monotonic <= (loop.time() + 1e-3)