How to use the pyuavcan.transport function in pyuavcan

To help you get started, we’ve selected a few pyuavcan examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github UAVCAN / pyuavcan / pyuavcan / application / plug_and_play.py View on Github external
class Allocatee:
    """
    Plug-and-play node-ID protocol client.

    This class represents a node that requires an allocated node-ID.
    Once started, the client will keep issuing node-ID allocation requests until either a node-ID is granted
    or until the node-ID of the underlying transport instance ceases to be anonymous (that could happen if the
    transport is re-configured externally).
    The status (whether the allocation is finished or still in progress) is to be queried periodically
    via the method :meth:`get_result`.

    Uses v1 allocation messages if the transport MTU is small (like if the transport is Classic CAN).
    Switches between v1 and v2 as necessary on the fly if the transport is reconfigured at runtime.
    """

    DEFAULT_PRIORITY = pyuavcan.transport.Priority.SLOW

    _MTU_THRESHOLD = max(pyuavcan.dsdl.get_model(NodeIDAllocationData_2).bit_length_set) // 8

    def __init__(self,
                 presentation:      pyuavcan.presentation.Presentation,
                 local_unique_id:   bytes,
                 preferred_node_id: typing.Optional[int] = None):
        """
        :param presentation: The presentation instance to use. If the underlying transport is not anonymous
            (i.e., a node-ID is already set), the allocatee will simply return the existing node-ID and do nothing.

        :param local_unique_id: The 128-bit globally unique-ID of the local node; the same value is also contained
            in the ``uavcan.node.GetInfo.Response``. Beware that random generation of the unique-ID at every launch
            is a bad idea because it will exhaust the allocation table quickly. Refer to the Specification for details.

        :param preferred_node_id: If the application prefers to obtain a particular node-ID, it can specify it here.
github UAVCAN / pyuavcan / tests / transport / can / _can.py View on Github external
def __init__(self) -> None:
        self._item: typing.Optional[pyuavcan.transport.Feedback] = None
github UAVCAN / pyuavcan / pyuavcan / transport / can / _can.py View on Github external
# Lookup performance for the output registry is not important because it's only used for loopback frames.
        # Hence we don't trade-off memory for speed here.
        self._output_registry: typing.Dict[pyuavcan.transport.OutputSessionSpecifier, CANOutputSession] = {}

        # Input lookup must be fast, so we use constant-complexity static lookup table.
        self._input_dispatch_table = InputDispatchTable()

        self._last_filter_configuration_set: typing.Optional[typing.Sequence[FilterConfiguration]] = None

        self._frame_stats = CANTransportStatistics()

        if self._local_node_id is not None and not (0 <= self._local_node_id <= CANID.NODE_ID_MASK):
            raise ValueError(f'Invalid node ID for CAN: {self._local_node_id}')

        if media.mtu not in Media.VALID_MTU_SET:
            raise pyuavcan.transport.InvalidMediaConfigurationError(
                f'The MTU value {media.mtu} is not a member of {Media.VALID_MTU_SET}')
        self._mtu = media.mtu - 1
        assert self._mtu > 0

        if media.number_of_acceptance_filters < 1:
            raise pyuavcan.transport.InvalidMediaConfigurationError(
                f'The number of acceptance filters is too low: {media.number_of_acceptance_filters}')

        if media.loop is not self._loop:
            raise pyuavcan.transport.InvalidMediaConfigurationError(
                f'The media instance cannot use a different event loop: {media.loop} is not {self._loop}')

        media_name = type(media).__name__.lower()[:-len('Media')]
        self._descriptor = \
            f'<{media_name} mtu="{media.mtu}">{media.interface_name}'
github UAVCAN / pyuavcan / pyuavcan / transport / redundant / _session / _output.py View on Github external
def __init__(self,
                 specifier:        pyuavcan.transport.OutputSessionSpecifier,
                 payload_metadata: pyuavcan.transport.PayloadMetadata,
                 loop:             asyncio.AbstractEventLoop,
                 finalizer:        typing.Callable[[], None]):
        """
        Do not call this directly! Use the factory method instead.
        """
        self._specifier = specifier
        self._payload_metadata = payload_metadata
        self._loop = loop
        self._finalizer: typing.Optional[typing.Callable[[], None]] = finalizer
        assert isinstance(self._specifier, pyuavcan.transport.OutputSessionSpecifier)
        assert isinstance(self._payload_metadata, pyuavcan.transport.PayloadMetadata)
        assert isinstance(self._loop, asyncio.AbstractEventLoop)
        assert callable(self._finalizer)

        self._inferiors: typing.List[pyuavcan.transport.OutputSession] = []
        self._feedback_handler: typing.Optional[typing.Callable[[RedundantFeedback], None]] = None
        self._idle_send_future: typing.Optional[asyncio.Future[None]] = None
        self._lock = asyncio.Lock(loop=self._loop)

        self._stat_transfers = 0
        self._stat_payload_bytes = 0
        self._stat_errors = 0
        self._stat_drops = 0
github UAVCAN / pyuavcan / pyuavcan / transport / udp / _session / _input.py View on Github external
def __init__(self,
                 specifier:        pyuavcan.transport.InputSessionSpecifier,
                 payload_metadata: pyuavcan.transport.PayloadMetadata,
                 loop:             asyncio.AbstractEventLoop,
                 finalizer:        typing.Callable[[], None]):
        self._specifier = specifier
        self._payload_metadata = payload_metadata
        self._loop = loop
        self._maybe_finalizer: typing.Optional[typing.Callable[[], None]] = finalizer
        assert isinstance(self._specifier, pyuavcan.transport.InputSessionSpecifier)
        assert isinstance(self._payload_metadata, pyuavcan.transport.PayloadMetadata)
        assert isinstance(self._loop, asyncio.AbstractEventLoop)
        assert callable(self._maybe_finalizer)

        self._transfer_id_timeout = self.DEFAULT_TRANSFER_ID_TIMEOUT
        self._queue: asyncio.Queue[pyuavcan.transport.TransferFrom] = asyncio.Queue()
github UAVCAN / pyuavcan / pyuavcan / _cli / commands / _subsystems / transport.py View on Github external
def _make_evaluation_context() -> typing.Dict[str, typing.Any]:
    # This import is super slow, so we do it as late as possible.
    # Doing this when generating command-line arguments would be disastrous for performance.
    # noinspection PyTypeChecker
    pyuavcan.util.import_submodules(pyuavcan.transport)

    # Populate the context with all references that may be useful for the transport expression.
    context: typing.Dict[str, typing.Any] = {
        'pyuavcan': pyuavcan,
    }

    # Pre-import transport modules for convenience.
    for name, module in inspect.getmembers(pyuavcan.transport, inspect.ismodule):
        if not name.startswith('_'):
            context[name] = module

    # Pre-import transport classes for convenience.
    transport_base = pyuavcan.transport.Transport
    # Suppressing MyPy false positive: https://github.com/python/mypy/issues/5374
    for cls in pyuavcan.util.iter_descendants(transport_base):  # type: ignore
        if not cls.__name__.startswith('_') and cls is not transport_base:
            name = cls.__name__.rpartition(transport_base.__name__)[0]
            assert name
            context[name] = cls

    return context