Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
assert rx_transfer.transfer_id == 77777
assert rx_transfer.fragmented_payload == [b''.join(payload_single)]
print(tr.sample_statistics())
assert tr.sample_statistics().in_bytes >= 32 + sft_capacity + 2
assert tr.sample_statistics().in_frames == 1
assert tr.sample_statistics().in_out_of_band_bytes == 0
assert tr.sample_statistics().out_bytes == tr.sample_statistics().in_bytes
assert tr.sample_statistics().out_frames == 1
assert tr.sample_statistics().out_transfers == 1
assert tr.sample_statistics().out_incomplete == 0
with pytest.raises(pyuavcan.transport.OperationNotDefinedForAnonymousNodeError):
# Anonymous nodes can't send multiframe transfers.
assert await broadcaster.send_until(
Transfer(timestamp=Timestamp.now(),
priority=Priority.LOW,
transfer_id=77777,
fragmented_payload=payload_x3),
monotonic_deadline=get_monotonic() + 5.0
)
assert None is await subscriber_selective.receive_until(get_monotonic() + 0.1)
assert None is await subscriber_promiscuous.receive_until(get_monotonic() + 0.1)
assert None is await server_listener.receive_until(get_monotonic() + 0.1)
assert None is await client_listener.receive_until(get_monotonic() + 0.1)
#
# Service exchange test.
#
with pytest.raises(pyuavcan.transport.OperationNotDefinedForAnonymousNodeError):
# Anonymous nodes can't emit service transfers.
await media_a.send_until([
DataFrame(identifier=0xbadc0fe,
data=bytearray(range(8)),
format=FrameFormat.EXTENDED,
loopback=True),
DataFrame(identifier=0x12345678,
data=bytearray(range(0)),
format=FrameFormat.EXTENDED,
loopback=False),
DataFrame(identifier=0x123,
data=bytearray(range(6)),
format=FrameFormat.BASE,
loopback=True),
], asyncio.get_event_loop().time() + 1.0)
await asyncio.sleep(0.1)
ts_end = Timestamp.now()
print('rx_a:', rx_a)
# Three sent back from the other end, two loopback
assert len(rx_a) == 5
for f in rx_a:
assert ts_begin.monotonic_ns <= f.timestamp.monotonic_ns <= ts_end.monotonic_ns
assert ts_begin.system_ns <= f.timestamp.system_ns <= ts_end.system_ns
rx_loopback = list(filter(lambda x: x.loopback, rx_a))
rx_external = list(filter(lambda x: not x.loopback, rx_a))
assert len(rx_loopback) == 2 and len(rx_external) == 3
assert rx_loopback[0].identifier == 0xbadc0fe
assert rx_loopback[0].data == bytearray(range(8))
assert rx_loopback[0].format == FrameFormat.EXTENDED
raise NotImplementedError
ses = LoopbackOutputSession(specifier=specifier,
payload_metadata=payload_metadata,
loop=asyncio.get_event_loop(),
closer=do_close,
router=do_route)
assert specifier == ses.specifier
assert payload_metadata == ses.payload_metadata
assert not closed
ses.close()
assert closed
ts = pyuavcan.transport.Timestamp.now()
fb = LoopbackFeedback(ts)
assert fb.first_frame_transmission_timestamp == ts
assert fb.original_transfer_timestamp == ts
def check_timestamp(t: pyuavcan.transport.Timestamp) -> bool:
now = pyuavcan.transport.Timestamp.now()
s = ts.system_ns <= t.system_ns <= now.system_ns
m = ts.monotonic_ns <= t.monotonic_ns <= now.system_ns
return s and m
def _unittest_stream_parser() -> None:
from pytest import raises
from pyuavcan.transport import Priority, MessageDataSpecifier
from ._frame import SerialFrame
ts = pyuavcan.transport.Timestamp.now()
outputs: typing.List[typing.Union[SerialFrame, memoryview]] = []
with raises(ValueError):
sp = StreamParser(outputs.append, 0)
sp = StreamParser(outputs.append, 4)
def proc(b: typing.Union[bytes, memoryview]) -> typing.Sequence[typing.Union[SerialFrame, memoryview]]:
sp.process_next_chunk(b, ts)
out = outputs[:]
outputs.clear()
return out
assert not outputs
assert [memoryview(b'abcdef')] == proc(b'abcdef')
def _unittest_input_session() -> None:
import asyncio
from pytest import raises, approx
from pyuavcan.transport import InputSessionSpecifier, MessageDataSpecifier, Priority, TransferFrom
from pyuavcan.transport import PayloadMetadata, Timestamp
from pyuavcan.transport.commons.high_overhead_transport import TransferCRC
ts = Timestamp.now()
prio = Priority.SLOW
dst_nid = 1234
run_until_complete = asyncio.get_event_loop().run_until_complete
get_monotonic = asyncio.get_event_loop().time
nihil_supernum = b'nihil supernum'
finalized = False
def do_finalize() -> None:
nonlocal finalized
finalized = True
session_spec = InputSessionSpecifier(MessageDataSpecifier(12345), None)
payload_meta = PayloadMetadata(0xdead_beef_bad_c0ffe, 100)
def check_timestamp(t: pyuavcan.transport.Timestamp) -> bool:
now = pyuavcan.transport.Timestamp.now()
s = ts.system_ns <= t.system_ns <= now.system_ns
m = ts.monotonic_ns <= t.monotonic_ns <= now.system_ns
return s and m
for fr in frames:
async with self._port_lock: # TODO: the lock acquisition should be prioritized by frame priority!
min_buffer_size = len(fr.payload) * 3
if len(self._serialization_buffer) < min_buffer_size:
_logger.debug('%s: The serialization buffer is being enlarged from %d to %d bytes',
self, len(self._serialization_buffer), min_buffer_size)
self._serialization_buffer = bytearray(0 for _ in range(min_buffer_size))
compiled = fr.compile_into(self._serialization_buffer)
timeout = monotonic_deadline - self._loop.time()
if timeout > 0:
self._serial_port.write_timeout = timeout
try:
num_written = await self._loop.run_in_executor(self._background_executor,
self._serial_port.write,
compiled)
tx_ts = tx_ts or pyuavcan.transport.Timestamp.now()
except serial.SerialTimeoutException:
num_written = 0
_logger.info('%s: Port write timed out in %.3fs on frame %r', self, timeout, fr)
self._statistics.out_bytes += num_written or 0
else:
tx_ts = None # Timed out
break
num_written = len(compiled) if num_written is None else num_written
if num_written < len(compiled):
tx_ts = None # Write failed
break
self._statistics.out_frames += 1
except Exception as ex:
if self._closed:
def _unittest_redundant_output_exceptions() -> None:
import pytest
from pyuavcan.transport import Transfer, Timestamp, Priority, SessionStatistics
from pyuavcan.transport import TransferFrom
from pyuavcan.transport.loopback import LoopbackTransport
loop = asyncio.get_event_loop()
await_ = loop.run_until_complete
spec = pyuavcan.transport.OutputSessionSpecifier(pyuavcan.transport.MessageDataSpecifier(4321), None)
spec_rx = pyuavcan.transport.InputSessionSpecifier(spec.data_specifier, None)
meta = pyuavcan.transport.PayloadMetadata(0x_deadbeef_deadbeef, 30 * 1024 * 1024)
ts = Timestamp.now()
is_retired = False
def retire() -> None:
nonlocal is_retired
is_retired = True
ses = RedundantOutputSession(spec, meta, loop=loop, finalizer=retire)
assert not is_retired
assert ses.specifier is spec
assert ses.payload_metadata is meta
assert not ses.inferiors
assert ses.sample_statistics() == RedundantSessionStatistics()
tr_a = LoopbackTransport(111)
tr_b = LoopbackTransport(111)