Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def check_latest_node_info():
curr_node_info = node.monitor.get(device_node_id).info
logger.info('Comparing node info, original vs. current:\n%s\n---\n%s',
uavcan.to_yaml(orig_node_info), uavcan.to_yaml(curr_node_info))
_enforce(orig_node_info.hardware_version.major ==
curr_node_info.hardware_version.major,
'HW version major')
_enforce(orig_node_info.hardware_version.minor ==
curr_node_info.hardware_version.minor,
'HW version minor')
_enforce(orig_node_info.hardware_version.unique_id ==
curr_node_info.hardware_version.unique_id,
'HW UID')
_enforce(orig_node_info.hardware_version.certificate_of_authenticity ==
curr_node_info.hardware_version.certificate_of_authenticity,
'HW COA')
def _on_message(self, e):
# Global statistics
self._num_messages_total += 1
# Rendering and filtering
try:
text = uavcan.to_yaml(e)
if not self._apply_filter(text):
return
except Exception as ex:
self._num_errors += 1
text = '!!! [%d] MESSAGE PROCESSING FAILED: %s' % (self._num_errors, ex)
else:
self._num_messages_past_filter += 1
self._msgs_per_sec_estimator.register_event(e.transfer.ts_monotonic)
# Sending the text for later rendering
try:
self._message_queue.put_nowait(text)
except queue.Full:
pass
def callback(e):
if e is None:
self.window().show_message('Transport stats request timed out')
else:
text = uavcan.to_yaml(e.response)
win = QDialog(self)
view = QPlainTextEdit(win)
view.setReadOnly(True)
view.setFont(get_monospace_font())
view.setPlainText(text)
view.setLineWrapMode(QPlainTextEdit.NoWrap)
layout = QVBoxLayout(win)
layout.addWidget(view)
win.setModal(True)
win.setWindowTitle('Transport stats of node %r' % e.transfer.source_node_id)
win.setLayout(layout)
win.show()
try:
def param_getset_response(event):
nonlocal response_received
nonlocal is_valid
nonlocal param_dict
nonlocal return_yaml
nonlocal param_yaml
if not event:
raise Exception('Request timed out')
response_received = True
is_valid = not hasattr(event.transfer.payload.value,'empty')
if is_valid:
#print(event)
#print(uavcan.to_yaml(event))
if return_yaml:
param_yaml = uavcan.to_yaml(event)
else:
param_dict['name'] = str(event.transfer.payload.name)
if hasattr(event.transfer.payload.value,'integer_value'):
param_dict['value'] = event.transfer.payload.value.integer_value
if hasattr(event.transfer.payload.value,'real_value'):
param_dict['value'] = event.transfer.payload.value.real_value
def _do_broadcast(self):
try:
if not self._pause.isChecked():
msg = uavcan.equipment.esc.RawCommand()
for sl in self._sliders:
raw_value = sl.get_value() / 100
value = (-self.CMD_MIN if raw_value < 0 else self.CMD_MAX) * raw_value
msg.cmd.append(int(value))
self._node.broadcast(msg)
self._msg_viewer.setPlainText(uavcan.to_yaml(msg))
else:
self._msg_viewer.setPlainText('Paused')
except Exception as ex:
self._msg_viewer.setPlainText('Publishing failed:\n' + str(ex))