Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# We've left the optional fields default-initialized here.
)
# The transport layer is ready; next layer up the protocol stack is the presentation layer. Construct it here.
presentation = pyuavcan.presentation.Presentation(transport)
# The application layer is next -- construct the node instance. It will serve GetInfo requests and publish its
# heartbeat automatically (unless it's anonymous). Read the source code of the Node class for more details.
self._node = pyuavcan.application.Node(presentation, node_info)
# Published heartbeat fields can be configured trivially by assigning them on the heartbeat publisher instance.
self._node.heartbeat_publisher.mode = uavcan.node.Heartbeat_1_0.MODE_OPERATIONAL
# In this example here we assign the local process' PID to the vendor-specific status code (VSSC) and make
# sure that the valid range is not exceeded.
self._node.heartbeat_publisher.vendor_specific_status_code = \
os.getpid() & (2 ** min(pyuavcan.dsdl.get_model(uavcan.node.Heartbeat_1_0)[
'vendor_specific_status_code'].data_type.bit_length_set) - 1)
# Now we can create our session objects as necessary. They can be created or destroyed later at any point
# after initialization. It's not necessary to set everything up during the initialization.
srv_least_squares = self._node.presentation.get_server(sirius_cyber_corp.PerformLinearLeastSquaresFit_1_0, 123)
# Will run until self._node is close()d:
srv_least_squares.serve_in_background(self._serve_linear_least_squares_request)
# Create another server using shorthand for fixed port ID. We could also use it with an application-specific
# service-ID as well, of course:
# get_server(uavcan.node.ExecuteCommand_1_0, 42).serve_in_background(self._serve_execute_command)
# If the transport does not yet have a node-ID, the server will stay idle until a node-ID is assigned
# because the node won't be able to receive unicast transfers carrying service requests.
self._node.presentation.get_server_with_fixed_service_id(
uavcan.node.ExecuteCommand_1_0
).serve_in_background(self._serve_execute_command)
transport_factory: TransportFactory) -> None:
assert generated_packages
import uavcan.node
import uavcan.diagnostic
from pyuavcan.transport import Priority
tran_a, tran_b, transmits_anon = transport_factory(None, None)
assert tran_a.local_node_id is None
assert tran_b.local_node_id is None
pres_a = pyuavcan.presentation.Presentation(tran_a)
pres_b = pyuavcan.presentation.Presentation(tran_b)
assert pres_a.transport is tran_a
sub_heart = pres_b.make_subscriber_with_fixed_subject_id(uavcan.node.Heartbeat_1_0)
with pytest.raises(TypeError):
# noinspection PyTypeChecker
pres_a.make_client_with_fixed_service_id(uavcan.node.Heartbeat_1_0, 123) # type: ignore
with pytest.raises(TypeError):
# noinspection PyTypeChecker
pres_a.get_server_with_fixed_service_id(uavcan.node.Heartbeat_1_0) # type: ignore
if transmits_anon:
pub_heart = pres_a.make_publisher_with_fixed_subject_id(uavcan.node.Heartbeat_1_0)
else:
with pytest.raises(pyuavcan.transport.OperationNotDefinedForAnonymousNodeError):
pres_a.make_publisher_with_fixed_subject_id(uavcan.node.Heartbeat_1_0)
return # The test ends here.
assert pub_heart._maybe_impl is not None
def _unittest_slow_manual_heartbeat(generated_packages: typing.List[pyuavcan.dsdl.GeneratedPackageInfo]) -> None:
import uavcan.node
# Implicit zero extension
ize = pyuavcan.dsdl.deserialize(uavcan.node.Heartbeat_1_0, [memoryview(b'')])
assert ize is not None
assert repr(ize) == repr(uavcan.node.Heartbeat_1_0())
assert ize.uptime == 0
assert ize.vendor_specific_status_code == 0
obj = pyuavcan.dsdl.deserialize(
uavcan.node.Heartbeat_1_0,
_compile_serialized_representation(
_bin(0xefbe_adde, 32), # uptime dead beef in little-endian byte order
'111', # vendor-specific, fragment
'010', # mode maintenance
'10', # health caution
'11111111''11111111' # vendor-specific, the remaining 16 bits
)
)
assert obj is not None
def _unittest_slow_manual_heartbeat(generated_packages: typing.List[pyuavcan.dsdl.GeneratedPackageInfo]) -> None:
import uavcan.node
# Implicit zero extension
ize = pyuavcan.dsdl.deserialize(uavcan.node.Heartbeat_1_0, [memoryview(b'')])
assert ize is not None
assert repr(ize) == repr(uavcan.node.Heartbeat_1_0())
assert ize.uptime == 0
assert ize.vendor_specific_status_code == 0
obj = pyuavcan.dsdl.deserialize(
uavcan.node.Heartbeat_1_0,
_compile_serialized_representation(
_bin(0xefbe_adde, 32), # uptime dead beef in little-endian byte order
'111', # vendor-specific, fragment
'010', # mode maintenance
'10', # health caution
'11111111''11111111' # vendor-specific, the remaining 16 bits
)
)
assert obj is not None
assert obj.uptime == 0xdeadbeef
assert obj.health == uavcan.node.Heartbeat_1_0.HEALTH_CAUTION
def __init__(self, presentation: pyuavcan.presentation.Presentation):
self._presentation = presentation
self._instantiated_at = time.monotonic()
self._health = Health.NOMINAL
self._mode = Mode.OPERATIONAL
self._vendor_specific_status_code = 0
self._pre_heartbeat_handlers: typing.List[typing.Callable[[], None]] = []
self._maybe_task: typing.Optional[asyncio.Task[None]] = None
self._publisher = self._presentation.make_publisher_with_fixed_subject_id(Heartbeat)
self._publisher.send_timeout = float(Heartbeat.MAX_PUBLICATION_PERIOD)
self._subscriber = self._presentation.make_subscriber_with_fixed_subject_id(Heartbeat)
def make_message(self) -> Heartbeat:
"""Constructs a new heartbeat message from the object's state."""
return Heartbeat(uptime=int(self.uptime), # must floor
health=self.health,
mode=self.mode,
vendor_specific_status_code=self.vendor_specific_status_code)
import asyncio
from uavcan.node import Heartbeat_1_0 as Heartbeat
import pyuavcan
DEFAULT_PRIORITY = pyuavcan.transport.Priority.SLOW
class Health(enum.IntEnum):
"""
Mirrors the health enumeration defined in ``uavcan.node.Heartbeat``.
When enumerations are natively supported in DSDL, this will be replaced with an alias.
"""
NOMINAL = Heartbeat.HEALTH_NOMINAL
ADVISORY = Heartbeat.HEALTH_ADVISORY
CAUTION = Heartbeat.HEALTH_CAUTION
WARNING = Heartbeat.HEALTH_WARNING
class Mode(enum.IntEnum):
"""
Mirrors the mode enumeration defined in ``uavcan.node.Heartbeat``.
When enumerations are natively supported in DSDL, this will be replaced with an alias.
"""
OPERATIONAL = Heartbeat.MODE_OPERATIONAL
INITIALIZATION = Heartbeat.MODE_INITIALIZATION
MAINTENANCE = Heartbeat.MODE_MAINTENANCE
SOFTWARE_UPDATE = Heartbeat.MODE_SOFTWARE_UPDATE
OFFLINE = Heartbeat.MODE_OFFLINE
VENDOR_SPECIFIC_STATUS_CODE_MASK = \
class Mode(enum.IntEnum):
"""
Mirrors the mode enumeration defined in ``uavcan.node.Heartbeat``.
When enumerations are natively supported in DSDL, this will be replaced with an alias.
"""
OPERATIONAL = Heartbeat.MODE_OPERATIONAL
INITIALIZATION = Heartbeat.MODE_INITIALIZATION
MAINTENANCE = Heartbeat.MODE_MAINTENANCE
SOFTWARE_UPDATE = Heartbeat.MODE_SOFTWARE_UPDATE
OFFLINE = Heartbeat.MODE_OFFLINE
VENDOR_SPECIFIC_STATUS_CODE_MASK = \
2 ** list(pyuavcan.dsdl.get_model(Heartbeat)['vendor_specific_status_code'].data_type.bit_length_set)[0] - 1
_logger = logging.getLogger(__name__)
class HeartbeatPublisher:
"""
This class manages periodic publication of the node heartbeat message.
Also it subscribes to heartbeat messages from other nodes and logs cautionary messages
if a node-ID conflict is detected on the bus.
Instances must be manually started when initialization is finished by invoking :meth:`start`.
The default states are as follows:
- Health is NOMINAL.