Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _unittest_slow_builtin_form_manual(generated_packages: typing.List[pyuavcan.dsdl.GeneratedPackageInfo]) -> None:
import uavcan.node
import uavcan.register
import uavcan.primitive.array
import uavcan.time
bi = pyuavcan.dsdl.to_builtin(uavcan.node.Heartbeat_1_0(uptime=123456,
health=2,
mode=6,
vendor_specific_status_code=0xbad))
assert bi == {
'uptime': 123456,
'health': 2,
'mode': 6,
'vendor_specific_status_code': 2989,
}
bi = pyuavcan.dsdl.to_builtin(uavcan.node.GetInfo_1_0.Response(
protocol_version=uavcan.node.Version_1_0(1, 2),
hardware_version=uavcan.node.Version_1_0(3, 4),
software_version=uavcan.node.Version_1_0(5, 6),
software_vcs_revision_id=0xbadc0ffee0ddf00d,
unique_id=b'0123456789abcdef',
import uavcan.time
bi = pyuavcan.dsdl.to_builtin(uavcan.node.Heartbeat_1_0(uptime=123456,
health=2,
mode=6,
vendor_specific_status_code=0xbad))
assert bi == {
'uptime': 123456,
'health': 2,
'mode': 6,
'vendor_specific_status_code': 2989,
}
bi = pyuavcan.dsdl.to_builtin(uavcan.node.GetInfo_1_0.Response(
protocol_version=uavcan.node.Version_1_0(1, 2),
hardware_version=uavcan.node.Version_1_0(3, 4),
software_version=uavcan.node.Version_1_0(5, 6),
software_vcs_revision_id=0xbadc0ffee0ddf00d,
unique_id=b'0123456789abcdef',
name='org.node.my',
software_image_crc=[0x0dddeadb16b00b5],
certificate_of_authenticity=list(range(100))
))
print(bi)
assert bi == {
'protocol_version': {'major': 1, 'minor': 2},
'hardware_version': {'major': 3, 'minor': 4},
'software_version': {'major': 5, 'minor': 6},
'software_vcs_revision_id': 0xbadc0ffee0ddf00d,
'unique_id': list(b'0123456789abcdef'),
'name': 'org.node.my',
'software_image_crc': [0x0dddeadb16b00b5],
transport = pyuavcan.transport.redundant.RedundantTransport()
transport.attach_inferior(pyuavcan.transport.udp.UDPTransport('127.0.0.42/8'))
transport.attach_inferior(pyuavcan.transport.serial.SerialTransport('socket://localhost:50905',
local_node_id=42))
else:
raise RuntimeError(f'Unrecognized interface kind: {interface_kind}') # pragma: no cover
assert transport.local_node_id == 42 # Yup, the node-ID is configured.
# Populate the node info for use with the Node class. Please see the DSDL definition of uavcan.node.GetInfo.
node_info = uavcan.node.GetInfo_1_0.Response(
# Version of the protocol supported by the library, and hence by our node.
protocol_version=uavcan.node.Version_1_0(*pyuavcan.UAVCAN_SPECIFICATION_VERSION),
# There is a similar field for hardware version, but we don't populate it because it's a software-only node.
software_version=uavcan.node.Version_1_0(major=1, minor=0),
# The name of the local node. Should be a reversed Internet domain name, like a Java package.
name='org.uavcan.pyuavcan.demo.basic_usage',
# We've left the optional fields default-initialized here.
)
# The transport layer is ready; next layer up the protocol stack is the presentation layer. Construct it here.
presentation = pyuavcan.presentation.Presentation(transport)
# The application layer is next -- construct the node instance. It will serve GetInfo requests and publish its
# heartbeat automatically (unless it's anonymous). Read the source code of the Node class for more details.
self._node = pyuavcan.application.Node(presentation, node_info)
# Published heartbeat fields can be configured trivially by assigning them on the heartbeat publisher instance.
self._node.heartbeat_publisher.mode = uavcan.node.Heartbeat_1_0.MODE_OPERATIONAL
# In this example here we assign the local process' PID to the vendor-specific status code (VSSC) and make
# sure that the valid range is not exceeded.
# There is a similar field for hardware version, but we don't populate it because it's a software-only node.
software_version=uavcan.node.Version_1_0(major=1, minor=0),
# The name of the local node. Should be a reversed Internet domain name, like a Java package.
name='org.uavcan.pyuavcan.demo.basic_usage',
# We've left the optional fields default-initialized here.
)
# The transport layer is ready; next layer up the protocol stack is the presentation layer. Construct it here.
presentation = pyuavcan.presentation.Presentation(transport)
# The application layer is next -- construct the node instance. It will serve GetInfo requests and publish its
# heartbeat automatically (unless it's anonymous). Read the source code of the Node class for more details.
self._node = pyuavcan.application.Node(presentation, node_info)
# Published heartbeat fields can be configured trivially by assigning them on the heartbeat publisher instance.
self._node.heartbeat_publisher.mode = uavcan.node.Heartbeat_1_0.MODE_OPERATIONAL
# In this example here we assign the local process' PID to the vendor-specific status code (VSSC) and make
# sure that the valid range is not exceeded.
self._node.heartbeat_publisher.vendor_specific_status_code = \
os.getpid() & (2 ** min(pyuavcan.dsdl.get_model(uavcan.node.Heartbeat_1_0)[
'vendor_specific_status_code'].data_type.bit_length_set) - 1)
# Now we can create our session objects as necessary. They can be created or destroyed later at any point
# after initialization. It's not necessary to set everything up during the initialization.
srv_least_squares = self._node.presentation.get_server(sirius_cyber_corp.PerformLinearLeastSquaresFit_1_0, 123)
# Will run until self._node is close()d:
srv_least_squares.serve_in_background(self._serve_linear_least_squares_request)
# Create another server using shorthand for fixed port ID. We could also use it with an application-specific
# service-ID as well, of course:
# get_server(uavcan.node.ExecuteCommand_1_0, 42).serve_in_background(self._serve_execute_command)
# If the transport does not yet have a node-ID, the server will stay idle until a node-ID is assigned
assert tran_a.local_node_id is None
assert tran_b.local_node_id is None
pres_a = pyuavcan.presentation.Presentation(tran_a)
pres_b = pyuavcan.presentation.Presentation(tran_b)
assert pres_a.transport is tran_a
sub_heart = pres_b.make_subscriber_with_fixed_subject_id(uavcan.node.Heartbeat_1_0)
with pytest.raises(TypeError):
# noinspection PyTypeChecker
pres_a.make_client_with_fixed_service_id(uavcan.node.Heartbeat_1_0, 123) # type: ignore
with pytest.raises(TypeError):
# noinspection PyTypeChecker
pres_a.get_server_with_fixed_service_id(uavcan.node.Heartbeat_1_0) # type: ignore
if transmits_anon:
pub_heart = pres_a.make_publisher_with_fixed_subject_id(uavcan.node.Heartbeat_1_0)
else:
with pytest.raises(pyuavcan.transport.OperationNotDefinedForAnonymousNodeError):
pres_a.make_publisher_with_fixed_subject_id(uavcan.node.Heartbeat_1_0)
return # The test ends here.
assert pub_heart._maybe_impl is not None
assert pub_heart._maybe_impl.proxy_count == 1
pub_heart_new = pres_a.make_publisher_with_fixed_subject_id(uavcan.node.Heartbeat_1_0)
assert pub_heart_new._maybe_impl is not None
assert pub_heart is not pub_heart_new
assert pub_heart._maybe_impl is pub_heart_new._maybe_impl
assert pub_heart._maybe_impl.proxy_count == 2
pub_heart_new.close()
timeout=5.0)
print('node_info_text:', node_info_text)
node_info = json.loads(node_info_text)
assert node_info['430']['_metadata_']['source_node_id'] == 42
assert node_info['430']['_metadata_']['transfer_id'] >= 0
assert 'slow' in node_info['430']['_metadata_']['priority'].lower()
assert node_info['430']['name'] == 'org.uavcan.pyuavcan.demo.basic_usage'
assert node_info['430']['protocol_version']['major'] == pyuavcan.UAVCAN_SPECIFICATION_VERSION[0]
assert node_info['430']['protocol_version']['minor'] == pyuavcan.UAVCAN_SPECIFICATION_VERSION[1]
command_response = json.loads(run_cli_tool(
'-v', 'call', '42', 'uavcan.node.ExecuteCommand.1.0',
f'{{command: {uavcan.node.ExecuteCommand_1_0.Request.COMMAND_STORE_PERSISTENT_STATES} }}',
'--format', 'json', *iface_option.make_cli_args(123), timeout=5.0 # type: ignore
))
assert command_response['435']['status'] == uavcan.node.ExecuteCommand_1_0.Response.STATUS_BAD_COMMAND
# Next request - this fails if the OUTPUT TRANSFER-ID MAP save/restore logic is not working.
command_response = json.loads(run_cli_tool(
'-v', 'call', '42', 'uavcan.node.ExecuteCommand.1.0', '{command: 23456}',
'--format', 'json', *iface_option.make_cli_args(123), timeout=5.0 # type: ignore
))
assert command_response['435']['status'] == uavcan.node.ExecuteCommand_1_0.Response.STATUS_SUCCESS
least_squares_response = json.loads(run_cli_tool(
'-vv', 'call', '42', '123.sirius_cyber_corp.PerformLinearLeastSquaresFit.1.0',
'{points: [{x: 1, y: 2}, {x: 10, y: 20}]}', '--timeout=5',
'--format', 'json', *iface_option.make_cli_args(123), timeout=6.0 # type: ignore
))
assert least_squares_response['123']['slope'] == pytest.approx(2.0)
assert least_squares_response['123']['y_intercept'] == pytest.approx(0.0)
if JSON_LOG_FILE:
with open(JSON_LOG_FILE, "a") as f:
f.write(message + "\n")
def format_bytes(a):
return "".join("{:08b}".format(v)[::-1] for v in a)
class RadioRXMonitor(uavcan.node.Monitor):
def on_message(self, message):
log.info("RadioRXMonitor: {!s}".format(format_bytes(message.bits)))
class MessageRelayMonitor(uavcan.node.Monitor):
def on_message(self, message):
send_all(message.type.get_normalized_definition(),
self.transfer.source_node_id, message)
class CANHandler(tornado.websocket.WebSocketHandler):
def __init__(self, *args, **kwargs):
self.can = kwargs.pop("can", None)
self.node = kwargs.pop("node", None)
super(CANHandler, self).__init__(*args, **kwargs)
def check_origin(self, origin):
return True
def open(self):
self.set_nodelay(True)
os.path.basename(firmware_path))
(response, response_transfer), _ = yield tornado.gen.Task(
this_node.send_request, request, node_id)
if response and response.error != response.ERROR_OK:
msg = ("[MASTER] #{0:03d} rejected "
"uavcan.protocol.file.BeginFirmwareUpdate " +
"with error {1:d}: {2!s}").format(
node_id, response.error,
response.optional_error_message.decode())
log.error(msg)
else:
log.debug("enumerate_device(): device up to date")
if len(args):
node = uavcan.node.Node([
# Server implementation
(uavcan.protocol.NodeStatus, uavcan.monitors.NodeStatusMonitor,
{"new_node_callback": enumerate_device}),
(uavcan.protocol.dynamic_node_id.Allocation,
uavcan.monitors.DynamicNodeIDServer,
{"dynamic_id_range": (2, 125)}),
(uavcan.protocol.file.GetInfo, uavcan.services.FileGetInfoService,
{"path": firmware_dir}),
(uavcan.protocol.file.Read, uavcan.services.FileReadService,
{"path": firmware_dir}),
(uavcan.protocol.debug.LogMessage,
uavcan.monitors.DebugLogMessageMonitor),
# CAN<->WebSocket bridge
(uavcan.protocol.NodeStatus, MessageRelayMonitor),
(uavcan.protocol.enumeration.Indication, MessageRelayMonitor),
(uavcan.equipment.esc.Status, MessageRelayMonitor),
#
# Copyright (c) 2019 UAVCAN Development Team
# This software is distributed under the terms of the MIT License.
# Author: Pavel Kirienko
#
from __future__ import annotations
import logging
import uavcan.node
import pyuavcan
import pyuavcan.application
NodeInfo = uavcan.node.GetInfo_1_0.Response
_logger = logging.getLogger(__name__)
class Node:
"""
This is the top-level abstraction representing a UAVCAN node on the bus.
This class is just a minor addition on top of the lower-level abstractions of the library
implementing commonly-used/mandatory functions of the protocol such as heartbeat reporting and responding
to node info requests ``uavcan.node.GetInfo``.
Start the instance when initialization is finished by invoking :meth:`start`.
"""
def __init__(self,
async def _run(transport: pyuavcan.transport.Transport) -> int:
import uavcan.node
node_id_set_cardinality = transport.protocol_parameters.node_id_set_cardinality
if node_id_set_cardinality >= 2 ** 32:
# Special case: for very large sets just pick a random number. Very large sets are only possible with test
# transports such as loopback so it's acceptable. If necessary, later we could develop a more robust solution.
print(random.randint(0, node_id_set_cardinality - 1))
return 0
candidates = set(range(node_id_set_cardinality))
pres = pyuavcan.presentation.Presentation(transport)
with contextlib.closing(pres):
deadline = asyncio.get_event_loop().time() + uavcan.node.Heartbeat_1_0.MAX_PUBLICATION_PERIOD * 2.0
sub = pres.make_subscriber_with_fixed_subject_id(uavcan.node.Heartbeat_1_0)
while asyncio.get_event_loop().time() <= deadline:
result = await sub.receive_until(deadline)
if result is not None:
msg, transfer = result
assert isinstance(transfer, pyuavcan.transport.TransferFrom)
_logger.debug('Received %r via %r', msg, transfer)
if transfer.source_node_id is None:
_logger.warning('FYI, the network contains an anonymous node which is publishing Heartbeat. '
'Please contact the vendor and inform them that this behavior is non-compliant. '
'The offending heartbeat message is: %r, transfer: %r', msg, transfer)
else:
try:
candidates.remove(int(transfer.source_node_id))
except LookupError:
pass