Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _request_next(self):
def callback(event):
if event:
value, value_type = extract_value(event.response.value)
self.q.put(Parameter(name=str(event.response.name), value=value, type=value_type))
if len(event.response.name) == 0:
self.done = True
else:
self.logger.warning('Service request has timed out!')
self.q.put(None)
self.done = True
self.node.request(uavcan.protocol.param.GetSet.Request(index=self.index), self.node_id, callback)
self.index = self.index + 1
def set_antenna_delay(node, value):
request = uavcan.protocol.param.GetSet.Request()
request.name = '/uwb/antenna_delay'
request.value = uavcan.protocol.param.Value(real_value=value)
#print(uavcan.to_yaml(event))
if return_yaml:
param_yaml = uavcan.to_yaml(event)
else:
param_dict['name'] = str(event.transfer.payload.name)
if hasattr(event.transfer.payload.value,'integer_value'):
param_dict['value'] = event.transfer.payload.value.integer_value
if hasattr(event.transfer.payload.value,'real_value'):
param_dict['value'] = event.transfer.payload.value.real_value
tmo_time = time.time() + 5.0
while not is_valid:
if time.time() > tmo_time: break
response_received = False
if is_name:
node.request(uavcan.protocol.param.GetSet.Request(
index = 0,
#value = uavcan.protocol.param.Value(empty=uavcan.protocol.param.Empty()),
name = str(indnam)
),
target_node_id,
param_getset_response
)
else:
node.request(uavcan.protocol.param.GetSet.Request(
index = int(indnam),
#value = uavcan.protocol.param.Value(empty=uavcan.protocol.param.Empty()),
#name = ''
),
target_node_id,
param_getset_response
)
value = float(self._value_widget.text())
self._param_struct.value.real_value = value
elif value_type == 'boolean_value':
value = bool(self._value_widget.isChecked())
self._param_struct.value.boolean_value = value
elif value_type == 'string_value':
value = self._value_widget.text()
self._param_struct.value.string_value = value
else:
raise RuntimeError('This is not happening!')
except Exception as ex:
show_error('Format error', 'Could not parse value', ex, self)
return
try:
request = uavcan.protocol.param.GetSet.Request(name=self._param_struct.name,
value=self._param_struct.value)
logger.info('Sending param set request: %s', request)
self._node.request(request, self._target_node_id, self._on_response, priority=REQUEST_PRIORITY)
except Exception as ex:
show_error('Node error', 'Could not send param set request', ex, self)
else:
self.show_message('Set request sent')
def _do_fetch(self):
try:
request = uavcan.protocol.param.GetSet.Request(name=self._param_struct.name)
self._node.request(request, self._target_node_id, self._on_response, priority=REQUEST_PRIORITY)
except Exception as ex:
show_error('Node error', 'Could not send param get request', ex, self)
else:
self.show_message('Fetch request sent')