Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
ts = pyuavcan.transport.Timestamp.now()
assert await out_123.send_until(pyuavcan.transport.Transfer(
timestamp=ts,
priority=pyuavcan.transport.Priority.IMMEDIATE,
transfer_id=123, # mod 32 = 27
fragmented_payload=[memoryview(b'Hello world!')],
), tr.loop.time() + 1.0)
out_123.disable_feedback()
assert last_feedback is not None
assert last_feedback.original_transfer_timestamp == ts
assert last_feedback.first_frame_transmission_timestamp == ts
del ts
assert out_123.sample_statistics() == pyuavcan.transport.SessionStatistics(
transfers=1,
frames=1,
payload_bytes=len('Hello world!'),
)
old_out = out_123
out_123.close()
out_123.close() # Double close handled properly
out_123 = tr.get_output_session(specifier=message_spec_123_out, payload_metadata=payload_metadata)
assert out_123 is not old_out
del old_out
inp_123 = tr.get_input_session(specifier=message_spec_123_in, payload_metadata=payload_metadata)
assert inp_123 is tr.get_input_session(specifier=message_spec_123_in, payload_metadata=payload_metadata)
old_inp = inp_123
transfer_id=123, # mod 32 = 27
fragmented_payload=[memoryview(b'Hello world!')],
), tr.loop.time() + 1.0)
assert None is await inp_123.receive_until(0)
assert None is await inp_123.receive_until(tr.loop.time() + 1.0)
rx = await inp_42.receive_until(0)
assert rx is not None
assert rx.timestamp.monotonic <= time.monotonic()
assert rx.timestamp.system <= time.time()
assert rx.priority == pyuavcan.transport.Priority.IMMEDIATE
assert rx.transfer_id == 27
assert rx.fragmented_payload == [memoryview(b'Hello world!')]
assert rx.source_node_id == tr.local_node_id
assert inp_42.sample_statistics() == pyuavcan.transport.SessionStatistics(
transfers=1,
frames=1,
payload_bytes=len('Hello world!'),
)
out_bc.exception = RuntimeError('EXCEPTION SUKA')
with pytest.raises(ValueError):
# noinspection PyTypeHints
out_bc.exception = 123 # type: ignore
with pytest.raises(RuntimeError, match='EXCEPTION SUKA'):
assert await out_bc.send_until(pyuavcan.transport.Transfer(
timestamp=pyuavcan.transport.Timestamp.now(),
priority=pyuavcan.transport.Priority.IMMEDIATE,
transfer_id=123, # mod 32 = 27
fragmented_payload=[memoryview(b'Hello world!')],
), tr.loop.time() + 1.0)
InputSessionSpecifier(ServiceDataSpecifier(333, ServiceDataSpecifier.Role.REQUEST), None), meta)
selective_client_s333_5 = tr2.get_input_session(
InputSessionSpecifier(ServiceDataSpecifier(333, ServiceDataSpecifier.Role.RESPONSE), 5), meta)
selective_client_s333_9 = tr2.get_input_session(
InputSessionSpecifier(ServiceDataSpecifier(333, ServiceDataSpecifier.Role.RESPONSE), 9), meta)
promiscuous_client_s333 = tr2.get_input_session(
InputSessionSpecifier(ServiceDataSpecifier(333, ServiceDataSpecifier.Role.RESPONSE), None), meta)
assert await client_requester.send_until(Transfer(
timestamp=ts,
priority=Priority.FAST,
transfer_id=11,
fragmented_payload=[]
), tr.loop.time() + 1.0)
assert client_requester.sample_statistics() == SessionStatistics(transfers=1, frames=1, payload_bytes=0)
received = await selective_server_s333_5.receive_until(tr.loop.time() + 1.0) # Same thing here
assert received is not None
assert received.transfer_id == 11
assert received.priority == Priority.FAST
validate_timestamp(received.timestamp)
assert list(map(bytes, received.fragmented_payload)) == [b'']
assert (await selective_server_s333_9.receive_until(tr.loop.time() + _RX_TIMEOUT)) is None
received = await promiscuous_server_s333.receive_until(tr.loop.time() + 1.0) # Same thing here
assert received is not None
assert received.transfer_id == 11
assert received.priority == Priority.FAST
validate_timestamp(received.timestamp)
assert list(map(bytes, received.fragmented_payload)) == [b'']
import typing
import logging
import dataclasses
import pyuavcan.transport
_logger = logging.getLogger(__name__)
@dataclasses.dataclass
class RedundantSessionStatistics(pyuavcan.transport.SessionStatistics):
"""
Aggregate statistics for all inferior sessions in a redundant group.
This is an atomic immutable sample; it is not updated after construction.
"""
inferiors: typing.List[pyuavcan.transport.SessionStatistics] = dataclasses.field(default_factory=list)
"""
The ordering is guaranteed to match that of :attr:`RedundantSession.inferiors`.
"""
class RedundantSession(abc.ABC):
"""
The base for all redundant session instances.
A redundant session may be constructed even if the redundant transport itself has no inferiors.
When a new inferior transport is attached/detached to/from the redundant group,
dependent session instances are automatically reconfigured, transparently to the user.
The higher layers of the protocol stack are therefore shielded from any changes made to the stack
below the redundant transport instance; existing sessions and other instances are never invalidated.
This guarantee allows one to construct applications whose underlying transport configuration
def __init__(self,
transport: pyuavcan.transport.can.CANTransport,
send_handler: SendHandler,
specifier: pyuavcan.transport.SessionSpecifier,
payload_metadata: pyuavcan.transport.PayloadMetadata,
finalizer: _base.SessionFinalizer):
"""Use the factory method."""
self._transport = transport
self._send_handler = send_handler
self._specifier = specifier
self._payload_metadata = payload_metadata
self._feedback_handler: typing.Optional[typing.Callable[[pyuavcan.transport.Feedback], None]] = None
self._pending_feedback: typing.Dict[_PendingFeedbackKey, pyuavcan.transport.Timestamp] = {}
self._statistics = pyuavcan.transport.SessionStatistics()
super(CANOutputSession, self).__init__(finalizer=finalizer)
import copy
import typing
import asyncio
import logging
import dataclasses
import pyuavcan.util
import pyuavcan.transport
from .. import _frame, _identifier
from . import _base, _transfer_reassembler
_logger = logging.getLogger(__name__)
@dataclasses.dataclass
class CANInputSessionStatistics(pyuavcan.transport.SessionStatistics):
reception_error_counters: typing.Dict[_transfer_reassembler.TransferReassemblyErrorID, int] = \
dataclasses.field(default_factory=lambda: {e: 0 for e in _transfer_reassembler.TransferReassemblyErrorID})
class CANInputSession(_base.CANSession, pyuavcan.transport.InputSession):
DEFAULT_TRANSFER_ID_TIMEOUT = 2
"""
Per the UAVCAN specification. Units are seconds. Can be overridden after instantiation if needed.
"""
_QueueItem = typing.Tuple[_identifier.CANID, _frame.TimestampedUAVCANFrame]
def __init__(self,
specifier: pyuavcan.transport.InputSessionSpecifier,
payload_metadata: pyuavcan.transport.PayloadMetadata,
loop: asyncio.AbstractEventLoop,
# This software is distributed under the terms of the MIT License.
# Author: Pavel Kirienko
#
import abc
import typing
import logging
import dataclasses
import pyuavcan.transport
_logger = logging.getLogger(__name__)
@dataclasses.dataclass
class RedundantSessionStatistics(pyuavcan.transport.SessionStatistics):
"""
Aggregate statistics for all inferior sessions in a redundant group.
This is an atomic immutable sample; it is not updated after construction.
"""
inferiors: typing.List[pyuavcan.transport.SessionStatistics] = dataclasses.field(default_factory=list)
"""
The ordering is guaranteed to match that of :attr:`RedundantSession.inferiors`.
"""
class RedundantSession(abc.ABC):
"""
The base for all redundant session instances.
A redundant session may be constructed even if the redundant transport itself has no inferiors.
When a new inferior transport is attached/detached to/from the redundant group,
# Shouldn't be too large as this value defines how quickly the task will detect that the underlying transport is closed.
_RECEIVE_TIMEOUT = 1
_logger = logging.getLogger(__name__)
#: Type of the async received message handler callable.
ReceivedMessageHandler = typing.Callable[[MessageClass, pyuavcan.transport.TransferFrom], typing.Awaitable[None]]
@dataclasses.dataclass
class SubscriberStatistics:
transport_session: pyuavcan.transport.SessionStatistics #: Shared per session specifier.
messages: int #: Number of received messages, individual per subscriber.
overruns: int #: Number of messages lost to queue overruns; individual per subscriber.
deserialization_failures: int #: Number of messages lost to deserialization errors; shared per session specifier.
class Subscriber(MessagePort[MessageClass]):
"""
A task should request its own independent subscriber instance from the presentation layer controller.
Do not share the same subscriber instance across different tasks. This class implements the RAII pattern.
Whenever a message is received from a subject, it is deserialized once and the resulting object is
passed by reference into each subscriber instance. If there is more than one subscriber instance for
a subject, accidental mutation of the object by one consumer may affect other consumers. To avoid this,
the application should either avoid mutating received message objects or clone them beforehand.
This class implements the async iterator protocol yielding received messages.
# Shouldn't be too large as this value defines how quickly the task will detect that the underlying transport is closed.
_RECEIVE_TIMEOUT = 1
_logger = logging.getLogger(__name__)
@dataclasses.dataclass
class ClientStatistics:
"""
The counters are maintained at the hidden client instance which is not accessible to the user.
As such, clients with the same session specifier will share the same set of statistical counters.
"""
request_transport_session: pyuavcan.transport.SessionStatistics
response_transport_session: pyuavcan.transport.SessionStatistics
sent_requests: int
deserialization_failures: int #: Response transfers that could not be deserialized into a response object.
unexpected_responses: int #: Response transfers that could not be matched with a request state.
class Client(ServicePort[ServiceClass]):
"""
A task should request its own client instance from the presentation layer controller.
Do not share the same client instance across different tasks. This class implements the RAII pattern.
Implementation info: all client instances sharing the same session specifier also share the same
underlying implementation object containing the transport sessions which is reference counted and
destroyed automatically when the last client instance is closed;
the user code cannot access it and generally shouldn't care.
None of the settings of a client instance, such as timeout or priority, can affect other client instances;
this does not apply to the transfer-ID counter objects though because they are transport-layer entities