Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def _unittest_loopback_transport_service() -> None:
from pyuavcan.transport import ServiceDataSpecifier, InputSessionSpecifier, OutputSessionSpecifier
payload_metadata = pyuavcan.transport.PayloadMetadata(0xdeadbeef0ddf00d, 1234)
tr = pyuavcan.transport.loopback.LoopbackTransport(1234)
inp = tr.get_input_session(InputSessionSpecifier(ServiceDataSpecifier(123, ServiceDataSpecifier.Role.REQUEST),
1234),
payload_metadata)
out = tr.get_output_session(OutputSessionSpecifier(ServiceDataSpecifier(123, ServiceDataSpecifier.Role.REQUEST),
1234),
payload_metadata)
assert await out.send_until(pyuavcan.transport.Transfer(
timestamp=pyuavcan.transport.Timestamp.now(),
priority=pyuavcan.transport.Priority.IMMEDIATE,
transfer_id=123, # mod 32 = 27
fragmented_payload=[memoryview(b'Hello world!')],
tr = pyuavcan.transport.loopback.LoopbackTransport(None)
protocol_params = pyuavcan.transport.ProtocolParameters(
transfer_id_modulo=32,
max_nodes=2 ** 64,
mtu=2 ** 64 - 1,
)
tr.protocol_parameters = protocol_params
assert tr.protocol_parameters == protocol_params
assert tr.loop is asyncio.get_event_loop()
assert tr.local_node_id is None
tr = pyuavcan.transport.loopback.LoopbackTransport(42)
tr.protocol_parameters = protocol_params
assert 42 == tr.local_node_id
payload_metadata = pyuavcan.transport.PayloadMetadata(0xdeadbeef0ddf00d, 1234)
message_spec_123_in = pyuavcan.transport.InputSessionSpecifier(pyuavcan.transport.MessageDataSpecifier(123), 123)
message_spec_123_out = pyuavcan.transport.OutputSessionSpecifier(pyuavcan.transport.MessageDataSpecifier(123), 123)
message_spec_42_in = pyuavcan.transport.InputSessionSpecifier(pyuavcan.transport.MessageDataSpecifier(123), 42)
message_spec_any_out = pyuavcan.transport.OutputSessionSpecifier(pyuavcan.transport.MessageDataSpecifier(123), None)
out_123 = tr.get_output_session(specifier=message_spec_123_out, payload_metadata=payload_metadata)
assert out_123 is tr.get_output_session(specifier=message_spec_123_out, payload_metadata=payload_metadata)
last_feedback: typing.Optional[pyuavcan.transport.Feedback] = None
def on_feedback(fb: pyuavcan.transport.Feedback) -> None:
nonlocal last_feedback
last_feedback = fb
out_123.enable_feedback(on_feedback)
ServiceDataSpecifier(-1, ServiceDataSpecifier.Role.REQUEST)
with raises(ValueError):
InputSessionSpecifier(MessageDataSpecifier(123), -1)
with raises(ValueError):
OutputSessionSpecifier(ServiceDataSpecifier(100, ServiceDataSpecifier.Role.RESPONSE), None)
with raises(ValueError):
PayloadMetadata(-1, 0)
with raises(ValueError):
PayloadMetadata(2 ** 64, 0)
with raises(ValueError):
PayloadMetadata(0, -1)
assert tr_a.local_node_id is None
assert tr_a.protocol_parameters == ProtocolParameters(
transfer_id_modulo=0,
max_nodes=0,
mtu=0,
)
assert tr_a.descriptor == '' # Empty, no inferiors.
assert tr_a.input_sessions == []
assert tr_a.output_sessions == []
assert tr_a.loop == tr_b.loop
#
# Instantiate session objects.
#
meta = PayloadMetadata(0xdeadbeef_deadbeef, 10_240)
pub_a = tr_a.get_output_session(OutputSessionSpecifier(MessageDataSpecifier(12345), None), meta)
sub_any_a = tr_a.get_input_session(InputSessionSpecifier(MessageDataSpecifier(12345), None), meta)
assert pub_a is tr_a.get_output_session(OutputSessionSpecifier(MessageDataSpecifier(12345), None), meta)
assert set(tr_a.input_sessions) == {sub_any_a}
assert set(tr_a.output_sessions) == {pub_a}
assert tr_a.sample_statistics() == RedundantTransportStatistics()
pub_b = tr_b.get_output_session(OutputSessionSpecifier(MessageDataSpecifier(12345), None), meta)
sub_any_b = tr_b.get_input_session(InputSessionSpecifier(MessageDataSpecifier(12345), None), meta)
sub_sel_b = tr_b.get_input_session(InputSessionSpecifier(MessageDataSpecifier(12345), 3210), meta)
assert sub_sel_b is tr_b.get_input_session(InputSessionSpecifier(MessageDataSpecifier(12345), 3210), meta)
assert set(tr_b.input_sessions) == {sub_any_b, sub_sel_b}
assert set(tr_b.output_sessions) == {pub_b}
assert tr_b.sample_statistics() == RedundantTransportStatistics()
destination_endpoint = '127.100.0.1', 25406
sock_rx = socket_.socket(socket_.AF_INET, socket_.SOCK_DGRAM)
sock_rx.bind(destination_endpoint)
sock_rx.settimeout(1.0)
def make_sock() -> socket_.socket:
sock = socket_.socket(socket_.AF_INET, socket_.SOCK_DGRAM)
sock.bind(('127.100.0.2', 0))
sock.connect(destination_endpoint)
sock.setblocking(False)
return sock
sos = UDPOutputSession(
specifier=OutputSessionSpecifier(MessageDataSpecifier(3210), None),
payload_metadata=PayloadMetadata(0xdead_beef_badc0ffe, 1024),
mtu=11,
multiplier=1,
sock=make_sock(),
loop=asyncio.get_event_loop(),
finalizer=do_finalize,
)
assert sos.specifier == OutputSessionSpecifier(MessageDataSpecifier(3210), None)
assert sos.destination_node_id is None
assert sos.payload_metadata == PayloadMetadata(0xdead_beef_badc0ffe, 1024)
assert sos.sample_statistics() == SessionStatistics()
assert run_until_complete(sos.send_until(
Transfer(timestamp=ts,
priority=Priority.NOMINAL,
transfer_id=12340,
Do not call this directly. Instead, use the factory method.
Instances take ownership of the socket.
"""
self._closed = False
self._specifier = specifier
self._payload_metadata = payload_metadata
self._mtu = int(mtu)
self._multiplier = int(multiplier)
self._sock = sock
self._loop = loop
self._finalizer = finalizer
self._feedback_handler: typing.Optional[typing.Callable[[pyuavcan.transport.Feedback], None]] = None
self._statistics = pyuavcan.transport.SessionStatistics()
if not isinstance(self._specifier, pyuavcan.transport.OutputSessionSpecifier) or \
not isinstance(self._payload_metadata, pyuavcan.transport.PayloadMetadata): # pragma: no cover
raise TypeError('Invalid parameters')
if self._multiplier < 1: # pragma: no cover
raise ValueError(f'Invalid transfer multiplier: {self._multiplier}')
assert specifier.remote_node_id is not None \
if isinstance(specifier.data_specifier, pyuavcan.transport.ServiceDataSpecifier) else True, \
'Internal protocol violation: cannot broadcast a service transfer'
prio = Priority.SLOW
dst_nid = 1234
run_until_complete = asyncio.get_event_loop().run_until_complete
get_monotonic = asyncio.get_event_loop().time
nihil_supernum = b'nihil supernum'
finalized = False
def do_finalize() -> None:
nonlocal finalized
finalized = True
session_spec = InputSessionSpecifier(MessageDataSpecifier(12345), None)
payload_meta = PayloadMetadata(0xdead_beef_bad_c0ffe, 100)
sis = SerialInputSession(specifier=session_spec,
payload_metadata=payload_meta,
loop=asyncio.get_event_loop(),
finalizer=do_finalize)
assert sis.specifier == session_spec
assert sis.payload_metadata == payload_meta
assert sis.sample_statistics() == SerialInputSessionStatistics()
assert sis.transfer_id_timeout == approx(SerialInputSession.DEFAULT_TRANSFER_ID_TIMEOUT)
sis.transfer_id_timeout = 1.0
with raises(ValueError):
sis.transfer_id_timeout = 0.0
assert sis.transfer_id_timeout == approx(1.0)
assert run_until_complete(sis.receive_until(get_monotonic() + 0.1)) is None
def _unittest_redundant_input_monotonic() -> None:
import pytest
from pyuavcan.transport import Transfer, Timestamp, Priority
from pyuavcan.transport.loopback import LoopbackTransport
loop = asyncio.get_event_loop()
await_ = loop.run_until_complete
spec = pyuavcan.transport.InputSessionSpecifier(pyuavcan.transport.MessageDataSpecifier(4321), None)
spec_tx = pyuavcan.transport.OutputSessionSpecifier(spec.data_specifier, None)
meta = pyuavcan.transport.PayloadMetadata(0x_deadbeef_deadbeef, 30)
ts = Timestamp.now()
tr_a = LoopbackTransport(111)
tr_b = LoopbackTransport(111)
tx_a = tr_a.get_output_session(spec_tx, meta)
tx_b = tr_b.get_output_session(spec_tx, meta)
inf_a = tr_a.get_input_session(spec, meta)
inf_b = tr_b.get_input_session(spec, meta)
inf_a.transfer_id_timeout = 1.1 # This is used to ensure that the transfer-ID timeout is handled correctly.
ses = RedundantInputSession(spec, meta,
tid_modulo_provider=lambda: None, # Like UDP or serial - infinite modulo.
loop=loop,
finalizer=lambda: None)
fragmented_payload=[]),
123456.789
))
sos = SerialOutputSession(
specifier=SessionSpecifier(MessageDataSpecifier(3210), None),
payload_metadata=PayloadMetadata(0xdead_beef_badc0ffe, 1024),
sft_payload_capacity_bytes=11,
local_node_id_accessor=lambda: None,
send_handler=do_send,
finalizer=do_finalize,
)
assert sos.specifier == SessionSpecifier(MessageDataSpecifier(3210), None)
assert sos.destination_node_id is None
assert sos.payload_metadata == PayloadMetadata(0xdead_beef_badc0ffe, 1024)
assert sos.sample_statistics() == SessionStatistics()
assert run_until_complete(sos.send_until(
Transfer(timestamp=ts,
priority=Priority.NOMINAL,
transfer_id=12340,
fragmented_payload=[memoryview(b'one'), memoryview(b'two'), memoryview(b'three')]),
123456.789
))
assert last_monotonic_deadline == approx(123456.789)
assert len(last_sent_frames) == 1
with raises(pyuavcan.transport.OperationNotDefinedForAnonymousNodeError):
run_until_complete(sos.send_until(
Transfer(timestamp=ts,
priority=Priority.NOMINAL,
def __init__(self,
specifier: pyuavcan.transport.InputSessionSpecifier,
payload_metadata: pyuavcan.transport.PayloadMetadata,
loop: asyncio.AbstractEventLoop,
finalizer: typing.Callable[[], None]):
"""
Do not call this directly.
Instead, use the factory method :meth:`pyuavcan.transport.serial.SerialTransport.get_input_session`.
"""
self._specifier = specifier
self._payload_metadata = payload_metadata
self._loop = loop
assert self._loop is not None
if not isinstance(self._specifier, pyuavcan.transport.InputSessionSpecifier) or \
not isinstance(self._payload_metadata, pyuavcan.transport.PayloadMetadata): # pragma: no cover
raise TypeError('Invalid parameters')
self._statistics = SerialInputSessionStatistics()
self._transfer_id_timeout = self.DEFAULT_TRANSFER_ID_TIMEOUT
self._queue: asyncio.Queue[pyuavcan.transport.TransferFrom] = asyncio.Queue()
self._reassemblers: typing.Dict[int, TransferReassembler] = {}
super(SerialInputSession, self).__init__(finalizer)