Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
sft_capacity = 1024
payload_single = [_mem('qwertyui'), _mem('01234567')] * (sft_capacity // 16)
assert sum(map(len, payload_single)) == sft_capacity
payload_x3 = (payload_single * 3)[:-1]
payload_x3_size_bytes = sft_capacity * 3 - 8
assert sum(map(len, payload_x3)) == payload_x3_size_bytes
#
# Instantiate session objects.
#
meta = PayloadMetadata(0x_bad_c0ffee_0dd_f00d, 10000)
broadcaster = tr.get_output_session(OutputSessionSpecifier(MessageDataSpecifier(12345), None), meta)
assert broadcaster is tr.get_output_session(OutputSessionSpecifier(MessageDataSpecifier(12345), None), meta)
subscriber_promiscuous = tr.get_input_session(InputSessionSpecifier(MessageDataSpecifier(12345), None), meta)
assert subscriber_promiscuous is tr.get_input_session(InputSessionSpecifier(MessageDataSpecifier(12345), None),
meta)
subscriber_selective = tr.get_input_session(InputSessionSpecifier(MessageDataSpecifier(12345), 3210), meta)
assert subscriber_selective is tr.get_input_session(InputSessionSpecifier(MessageDataSpecifier(12345), 3210), meta)
server_listener = tr.get_input_session(
InputSessionSpecifier(ServiceDataSpecifier(333, ServiceDataSpecifier.Role.REQUEST), None), meta)
assert server_listener is tr.get_input_session(
InputSessionSpecifier(ServiceDataSpecifier(333, ServiceDataSpecifier.Role.REQUEST), None), meta)
client_requester = tr.get_output_session(
OutputSessionSpecifier(ServiceDataSpecifier(333, ServiceDataSpecifier.Role.REQUEST), 3210), meta)
subscriber_promiscuous = tr.get_input_session(InputSessionSpecifier(MessageDataSpecifier(12345), None), meta)
assert subscriber_promiscuous is tr.get_input_session(InputSessionSpecifier(MessageDataSpecifier(12345), None),
meta)
subscriber_selective = tr.get_input_session(InputSessionSpecifier(MessageDataSpecifier(12345), 123), meta)
assert subscriber_selective is tr.get_input_session(InputSessionSpecifier(MessageDataSpecifier(12345), 123), meta)
server_listener = tr.get_input_session(
InputSessionSpecifier(ServiceDataSpecifier(444, ServiceDataSpecifier.Role.REQUEST), None), meta)
assert server_listener is tr.get_input_session(
InputSessionSpecifier(ServiceDataSpecifier(444, ServiceDataSpecifier.Role.REQUEST), None), meta)
server_responder = tr.get_output_session(
OutputSessionSpecifier(ServiceDataSpecifier(444, ServiceDataSpecifier.Role.RESPONSE), 222), meta)
assert server_responder is tr.get_output_session(
OutputSessionSpecifier(ServiceDataSpecifier(444, ServiceDataSpecifier.Role.RESPONSE), 222), meta)
client_requester = tr2.get_output_session(
OutputSessionSpecifier(ServiceDataSpecifier(444, ServiceDataSpecifier.Role.REQUEST), 111), meta)
assert client_requester is tr2.get_output_session(
OutputSessionSpecifier(ServiceDataSpecifier(444, ServiceDataSpecifier.Role.REQUEST), 111), meta)
client_listener = tr2.get_input_session(
InputSessionSpecifier(ServiceDataSpecifier(444, ServiceDataSpecifier.Role.RESPONSE), 111), meta)
assert client_listener is tr2.get_input_session(
InputSessionSpecifier(ServiceDataSpecifier(444, ServiceDataSpecifier.Role.RESPONSE), 111), meta)
print('tr :', tr.input_sessions, tr.output_sessions)
assert set(tr.input_sessions) == {subscriber_promiscuous, subscriber_selective, server_listener}
assert set(tr.output_sessions) == {server_responder}
print('tr2:', tr2.input_sessions, tr2.output_sessions)
transfer_id=5,
fragmented_payload=[memoryview(b'uio')]),
monotonic_deadline=loop.time() + 1.0
)
rx = await sub_any_b.receive_until(loop.time() + 1.0)
assert rx is not None
assert rx.fragmented_payload == [memoryview(b'uio')]
assert rx.transfer_id == 5
assert not await sub_any_a.receive_until(loop.time() + 0.1)
assert not await sub_any_b.receive_until(loop.time() + 0.1)
assert not await sub_sel_b.receive_until(loop.time() + 0.1)
#
# Construct new session with the transports configured.
#
pub_a_new = tr_a.get_output_session(OutputSessionSpecifier(MessageDataSpecifier(12345), 222), meta)
assert pub_a_new is tr_a.get_output_session(OutputSessionSpecifier(MessageDataSpecifier(12345), 222), meta)
assert set(tr_a.output_sessions) == {pub_a, pub_a_new}
assert await pub_a_new.send_until(
Transfer(timestamp=Timestamp.now(),
priority=Priority.LOW,
transfer_id=6,
fragmented_payload=[memoryview(b'asd')]),
monotonic_deadline=loop.time() + 1.0
)
rx = await sub_any_b.receive_until(loop.time() + 1.0)
assert rx is not None
assert rx.fragmented_payload == [memoryview(b'asd')]
assert rx.transfer_id == 6
#
def __init__(self,
specifier: pyuavcan.transport.OutputSessionSpecifier,
payload_metadata: pyuavcan.transport.PayloadMetadata,
loop: asyncio.AbstractEventLoop,
finalizer: typing.Callable[[], None]):
"""
Do not call this directly! Use the factory method instead.
"""
self._specifier = specifier
self._payload_metadata = payload_metadata
self._loop = loop
self._finalizer: typing.Optional[typing.Callable[[], None]] = finalizer
assert isinstance(self._specifier, pyuavcan.transport.OutputSessionSpecifier)
assert isinstance(self._payload_metadata, pyuavcan.transport.PayloadMetadata)
assert isinstance(self._loop, asyncio.AbstractEventLoop)
assert callable(self._finalizer)
self._inferiors: typing.List[pyuavcan.transport.OutputSession] = []
self._feedback_handler: typing.Optional[typing.Callable[[RedundantFeedback], None]] = None
self._idle_send_future: typing.Optional[asyncio.Future[None]] = None
self._lock = asyncio.Lock(loop=self._loop)
self._stat_transfers = 0
self._stat_payload_bytes = 0
self._stat_errors = 0
self._stat_drops = 0
# At first I tried using serial.is_open, but unfortunately that doesn't work reliably because the close()
# method on most serial port classes is non-atomic, which causes all sorts of weird race conditions
# and spurious errors in the reader thread (at least). A simple explicit flag is reliable.
self._closed = False
# For serial port write serialization. Read operations are performed concurrently (no sync) in separate thread.
self._port_lock = asyncio.Lock(loop=loop)
# The serialization buffer is re-used for performance reasons; it is needed to store frame contents before
# they are emitted into the serial port. It may grow as necessary at runtime; the initial size is a guess.
# Access must be protected with the port lock!
self._serialization_buffer = bytearray(b'\x00' * 1024)
self._input_registry: typing.Dict[pyuavcan.transport.InputSessionSpecifier, SerialInputSession] = {}
self._output_registry: typing.Dict[pyuavcan.transport.OutputSessionSpecifier, SerialOutputSession] = {}
self._statistics = SerialTransportStatistics()
if not isinstance(serial_port, serial.SerialBase):
serial_port = serial.serial_for_url(serial_port)
assert isinstance(serial_port, serial.SerialBase)
if not serial_port.is_open:
raise pyuavcan.transport.InvalidMediaConfigurationError('The serial port instance is not open')
serial_port.timeout = _SERIAL_PORT_READ_TIMEOUT
self._serial_port = serial_port
if baudrate is not None:
self._serial_port.baudrate = int(baudrate)
self._background_executor = concurrent.futures.ThreadPoolExecutor()
self._reader_thread = threading.Thread(target=self._reader_thread_func, daemon=True)
def __init__(self,
local_node_id: typing.Optional[int],
loop: typing.Optional[asyncio.AbstractEventLoop] = None):
self._loop = loop if loop is not None else asyncio.get_event_loop()
self._local_node_id = int(local_node_id) if local_node_id is not None else None
self._input_sessions: typing.Dict[pyuavcan.transport.InputSessionSpecifier, LoopbackInputSession] = {}
self._output_sessions: typing.Dict[pyuavcan.transport.OutputSessionSpecifier, LoopbackOutputSession] = {}
# Unlimited protocol capabilities by default.
self._protocol_parameters = pyuavcan.transport.ProtocolParameters(
transfer_id_modulo=2 ** 64,
max_nodes=2 ** 64,
mtu=2 ** 64 - 1,
)
See :class:`Client` for further information about clients.
"""
if not issubclass(dtype, pyuavcan.dsdl.ServiceObject):
raise TypeError(f'Not a service type: {dtype}')
self._raise_if_closed()
def transfer_id_modulo_factory() -> int:
return self._transport.protocol_parameters.transfer_id_modulo
input_session_specifier = pyuavcan.transport.InputSessionSpecifier(
pyuavcan.transport.ServiceDataSpecifier(service_id, pyuavcan.transport.ServiceDataSpecifier.Role.RESPONSE),
server_node_id
)
output_session_specifier = pyuavcan.transport.OutputSessionSpecifier(
pyuavcan.transport.ServiceDataSpecifier(service_id, pyuavcan.transport.ServiceDataSpecifier.Role.REQUEST),
server_node_id
)
try:
impl = self._registry[Client, input_session_specifier]
assert isinstance(impl, ClientImpl)
except LookupError:
output_transport_session = self._transport.get_output_session(output_session_specifier,
self._make_payload_metadata(dtype.Request))
input_transport_session = self._transport.get_input_session(input_session_specifier,
self._make_payload_metadata(dtype.Response))
transfer_id_counter = self._output_transfer_id_map.setdefault(output_session_specifier,
OutgoingTransferIDCounter())
impl = ClientImpl(dtype=dtype,
input_transport_session=input_transport_session,
output_transport_session=output_transport_session,
def _unittest_redundant_input_monotonic() -> None:
import pytest
from pyuavcan.transport import Transfer, Timestamp, Priority
from pyuavcan.transport.loopback import LoopbackTransport
loop = asyncio.get_event_loop()
await_ = loop.run_until_complete
spec = pyuavcan.transport.InputSessionSpecifier(pyuavcan.transport.MessageDataSpecifier(4321), None)
spec_tx = pyuavcan.transport.OutputSessionSpecifier(spec.data_specifier, None)
meta = pyuavcan.transport.PayloadMetadata(0x_deadbeef_deadbeef, 30)
ts = Timestamp.now()
tr_a = LoopbackTransport(111)
tr_b = LoopbackTransport(111)
tx_a = tr_a.get_output_session(spec_tx, meta)
tx_b = tr_b.get_output_session(spec_tx, meta)
inf_a = tr_a.get_input_session(spec, meta)
inf_b = tr_b.get_input_session(spec, meta)
inf_a.transfer_id_timeout = 1.1 # This is used to ensure that the transfer-ID timeout is handled correctly.
ses = RedundantInputSession(spec, meta,
tid_modulo_provider=lambda: None, # Like UDP or serial - infinite modulo.
loop=loop,
self._loop = loop if loop is not None else asyncio.get_event_loop()
low, high = self.VALID_SERVICE_TRANSFER_MULTIPLIER_RANGE
if not (low <= self._srv_multiplier <= high):
raise ValueError(f'Invalid service transfer multiplier: {self._srv_multiplier}')
low, high = self.VALID_MTU_RANGE
if not (low <= self._mtu <= high):
raise ValueError(f'Invalid MTU: {self._mtu} bytes')
_logger.debug(f'IP: {self._network_map}; max nodes: {self._network_map.max_nodes}; '
f'local node-ID: {self.local_node_id}')
self._demultiplexer_registry: typing.Dict[pyuavcan.transport.DataSpecifier, UDPDemultiplexer] = {}
self._input_registry: typing.Dict[pyuavcan.transport.InputSessionSpecifier, UDPInputSession] = {}
self._output_registry: typing.Dict[pyuavcan.transport.OutputSessionSpecifier, UDPOutputSession] = {}
self._closed = False
self._statistics = UDPTransportStatistics()
def _handle_loopback_frame(self, can_id: CANID, frame: TimestampedUAVCANFrame) -> None:
assert frame.loopback
ss = pyuavcan.transport.OutputSessionSpecifier(can_id.data_specifier, can_id.get_destination_node_id())
try:
session = self._output_registry[ss]
except KeyError:
_logger.info('No matching output session for loopback frame: %s; parsed CAN ID: %s; session specifier: %s. '
'Either the session has just been closed or the media driver is misbehaving.',
frame, can_id, ss)
else:
# noinspection PyProtectedMember
session._handle_loopback_frame(frame)