Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
class Allocatee:
"""
Plug-and-play node-ID protocol client.
This class represents a node that requires an allocated node-ID.
Once started, the client will keep issuing node-ID allocation requests until either a node-ID is granted
or until the node-ID of the underlying transport instance ceases to be anonymous (that could happen if the
transport is re-configured externally).
The status (whether the allocation is finished or still in progress) is to be queried periodically
via the method :meth:`get_result`.
Uses v1 allocation messages if the transport MTU is small (like if the transport is Classic CAN).
Switches between v1 and v2 as necessary on the fly if the transport is reconfigured at runtime.
"""
DEFAULT_PRIORITY = pyuavcan.transport.Priority.SLOW
_MTU_THRESHOLD = max(pyuavcan.dsdl.get_model(NodeIDAllocationData_2).bit_length_set) // 8
def __init__(self,
presentation: pyuavcan.presentation.Presentation,
local_unique_id: bytes,
preferred_node_id: typing.Optional[int] = None):
"""
:param presentation: The presentation instance to use. If the underlying transport is not anonymous
(i.e., a node-ID is already set), the allocatee will simply return the existing node-ID and do nothing.
:param local_unique_id: The 128-bit globally unique-ID of the local node; the same value is also contained
in the ``uavcan.node.GetInfo.Response``. Beware that random generation of the unique-ID at every launch
is a bad idea because it will exhaust the allocation table quickly. Refer to the Specification for details.
:param preferred_node_id: If the application prefers to obtain a particular node-ID, it can specify it here.
def __init__(self) -> None:
self._item: typing.Optional[pyuavcan.transport.Feedback] = None
# Lookup performance for the output registry is not important because it's only used for loopback frames.
# Hence we don't trade-off memory for speed here.
self._output_registry: typing.Dict[pyuavcan.transport.OutputSessionSpecifier, CANOutputSession] = {}
# Input lookup must be fast, so we use constant-complexity static lookup table.
self._input_dispatch_table = InputDispatchTable()
self._last_filter_configuration_set: typing.Optional[typing.Sequence[FilterConfiguration]] = None
self._frame_stats = CANTransportStatistics()
if self._local_node_id is not None and not (0 <= self._local_node_id <= CANID.NODE_ID_MASK):
raise ValueError(f'Invalid node ID for CAN: {self._local_node_id}')
if media.mtu not in Media.VALID_MTU_SET:
raise pyuavcan.transport.InvalidMediaConfigurationError(
f'The MTU value {media.mtu} is not a member of {Media.VALID_MTU_SET}')
self._mtu = media.mtu - 1
assert self._mtu > 0
if media.number_of_acceptance_filters < 1:
raise pyuavcan.transport.InvalidMediaConfigurationError(
f'The number of acceptance filters is too low: {media.number_of_acceptance_filters}')
if media.loop is not self._loop:
raise pyuavcan.transport.InvalidMediaConfigurationError(
f'The media instance cannot use a different event loop: {media.loop} is not {self._loop}')
media_name = type(media).__name__.lower()[:-len('Media')]
self._descriptor = \
f'<{media_name} mtu="{media.mtu}">{media.interface_name}'
def __init__(self,
specifier: pyuavcan.transport.OutputSessionSpecifier,
payload_metadata: pyuavcan.transport.PayloadMetadata,
loop: asyncio.AbstractEventLoop,
finalizer: typing.Callable[[], None]):
"""
Do not call this directly! Use the factory method instead.
"""
self._specifier = specifier
self._payload_metadata = payload_metadata
self._loop = loop
self._finalizer: typing.Optional[typing.Callable[[], None]] = finalizer
assert isinstance(self._specifier, pyuavcan.transport.OutputSessionSpecifier)
assert isinstance(self._payload_metadata, pyuavcan.transport.PayloadMetadata)
assert isinstance(self._loop, asyncio.AbstractEventLoop)
assert callable(self._finalizer)
self._inferiors: typing.List[pyuavcan.transport.OutputSession] = []
self._feedback_handler: typing.Optional[typing.Callable[[RedundantFeedback], None]] = None
self._idle_send_future: typing.Optional[asyncio.Future[None]] = None
self._lock = asyncio.Lock(loop=self._loop)
self._stat_transfers = 0
self._stat_payload_bytes = 0
self._stat_errors = 0
self._stat_drops = 0
def __init__(self,
specifier: pyuavcan.transport.InputSessionSpecifier,
payload_metadata: pyuavcan.transport.PayloadMetadata,
loop: asyncio.AbstractEventLoop,
finalizer: typing.Callable[[], None]):
self._specifier = specifier
self._payload_metadata = payload_metadata
self._loop = loop
self._maybe_finalizer: typing.Optional[typing.Callable[[], None]] = finalizer
assert isinstance(self._specifier, pyuavcan.transport.InputSessionSpecifier)
assert isinstance(self._payload_metadata, pyuavcan.transport.PayloadMetadata)
assert isinstance(self._loop, asyncio.AbstractEventLoop)
assert callable(self._maybe_finalizer)
self._transfer_id_timeout = self.DEFAULT_TRANSFER_ID_TIMEOUT
self._queue: asyncio.Queue[pyuavcan.transport.TransferFrom] = asyncio.Queue()
def _make_evaluation_context() -> typing.Dict[str, typing.Any]:
# This import is super slow, so we do it as late as possible.
# Doing this when generating command-line arguments would be disastrous for performance.
# noinspection PyTypeChecker
pyuavcan.util.import_submodules(pyuavcan.transport)
# Populate the context with all references that may be useful for the transport expression.
context: typing.Dict[str, typing.Any] = {
'pyuavcan': pyuavcan,
}
# Pre-import transport modules for convenience.
for name, module in inspect.getmembers(pyuavcan.transport, inspect.ismodule):
if not name.startswith('_'):
context[name] = module
# Pre-import transport classes for convenience.
transport_base = pyuavcan.transport.Transport
# Suppressing MyPy false positive: https://github.com/python/mypy/issues/5374
for cls in pyuavcan.util.iter_descendants(transport_base): # type: ignore
if not cls.__name__.startswith('_') and cls is not transport_base:
name = cls.__name__.rpartition(transport_base.__name__)[0]
assert name
context[name] = cls
return context