Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def poll():
_enforce(time.monotonic() < deadline, 'Stuck. :(')
if _maybe(): # Generating garbage on the bus, because why not
if _maybe():
r = uavcan.protocol.RestartNode.Request() # Invalid magic, this is intentional
elif _maybe(0.01):
r = uavcan.protocol.GetNodeInfo.Request()
else:
# The bootloader doesn't support this service, there will be no response
r = uavcan.protocol.file.Read.Request()
r.offset = int(random.random() * 1024 * 1024 * 1024)
r.path.path = \
'Senora, pray receive with your wonted kindness Senor Don Quixote of La Mancha, '\
'whom you see before you, a knight-errant, and the bravest and wisest in the world.'
node.request_async(r, device_node_id)
return not match_node_health_mode(NODE_STATUS_CONST.HEALTH_OK,
NODE_STATUS_CONST.MODE_SOFTWARE_UPDATE)
node.spin_until(poll, check_interval=0.01)
def _node_status_callback(self, event):
board = event.transfer.source_node_id
self.node_statuses[board] = event.message
if board not in self._board_names:
self.logger.info("Found a new board {}".format(board))
self.node.request(uavcan.protocol.GetNodeInfo.Request(), board,
self._board_info_callback)
self.logger.debug('NodeStatus from node {}'.format(board))
def node_status_callback(event):
board = event.transfer.source_node_id
if board not in boards_id_by_name:
node.request(uavcan.protocol.GetNodeInfo.Request(), board,
_board_info_callback)
uavcan.equipment.power.BatteryInfo().STATUS_FLAG_NEED_SERVICE |
uavcan.equipment.power.BatteryInfo().STATUS_FLAG_TEMP_HOT |
uavcan.equipment.power.BatteryInfo().STATUS_FLAG_CHARGED),
'status_flags'
))
print(value_to_constant_name(
uavcan.protocol.AccessCommandShell.Response(flags=
uavcan.protocol.AccessCommandShell.Response().FLAG_SHELL_ERROR |
uavcan.protocol.AccessCommandShell.Response().
FLAG_HAS_PENDING_STDOUT),
'flags'
))
# Printing transfers
node = uavcan.make_node('vcan0', node_id=42)
node.request(uavcan.protocol.GetNodeInfo.Request(), 100, lambda e: print(to_yaml(e)))
node.add_handler(uavcan.protocol.NodeStatus, lambda e: print(to_yaml(e)))
node.spin()
def _node_status_callback(self, event):
board = event.transfer.source_node_id
self.node.request(uavcan.protocol.GetNodeInfo.Request(), board,
self._board_info_callback)
def _node_status_callback(self, event):
board = event.transfer.source_node_id
logging.debug("Got a node status from {}".format(board))
if board not in self.boards_id_by_name.values():
self.node.request(uavcan.protocol.GetNodeInfo.Request(), board,
self._board_info_callback)
def _node_status_callback(self, event):
node_id = event.transfer.source_node_id
if node_id not in self.known_nodes:
self.known_nodes[node_id] = {}
self.node.request(uavcan.protocol.GetNodeInfo.Request(), node_id, self._response_callback)
self.known_nodes[node_id]['status'] = event.message