Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def setUp(self):
import uavcan
self.a = uavcan.protocol.GetNodeInfo.Response()
def setParameterByIndexOrName(node,target_node_id,indnam,intfloatvalue):
response_received = False
is_valid = False
is_name = type(indnam) is str
is_float = type(intfloatvalue) is float
if is_float:
val = uavcan.protocol.param.Value(real_value=int(intfloatvalue))
else:
val = uavcan.protocol.param.Value(integer_value=int(intfloatvalue))
def param_getset_response(event):
nonlocal response_received
nonlocal is_valid
if not event:
raise Exception('Request timed out')
response_received = True
is_valid = not hasattr(event.transfer.payload.value,'empty')
if is_valid:
#print(event)
#print(uavcan.to_yaml(event))
is_valid = False
if hasattr(event.transfer.payload.value,'integer_value'):
if not is_float and (event.transfer.payload.value.integer_value == int(intfloatvalue)):
is_valid = True
if hasattr(event.transfer.payload.value,'real_value'):
def monitor_update_handler(e):
global update_complete
if e.event_id == node_monitor.UpdateEvent.EVENT_ID_INFO_UPDATE:
print(e.entry)
if e.entry.info.name == args.node_name[0]:
if e.entry.info.software_version.image_crc != firmware_crc:
if e.entry.status.mode != e.entry.status.MODE_SOFTWARE_UPDATE:
print('updating %u' % (e.entry.node_id,))
req_msg = uavcan.protocol.file.BeginFirmwareUpdate.Request(source_node_id=node.node_id, image_file_remote_path=uavcan.protocol.file.Path(path=args.firmware_name[0]))
node.request(req_msg, e.entry.node_id, update_response_handler)
update_complete[e.entry.node_id] = False
else:
print('%u up to date' % (e.entry.node_id,))
update_complete[e.entry.node_id] = True
def __init__(self, node):
self.logger = logging.getLogger('NodeStatusMonitor')
self.known_nodes = {}
self.node = node
self.node.add_handler(uavcan.protocol.NodeStatus, self._node_status_callback)
self.new_node_cb = None
match = match_value(pow2)
if match:
matches.append(match)
else:
matches = []
break # If at least one couldn't be matched, we're on a wrong track, stop
if len(matches) > 0:
return ' | '.join(matches)
# No match could be found, returning the value as is
return value
if __name__ == '__main__':
# to_yaml()
print(to_yaml(uavcan.protocol.NodeStatus()))
info = uavcan.protocol.GetNodeInfo.Response(name='legion')
info.hardware_version.certificate_of_authenticity = b'\x01\x02\x03\xff'
print(to_yaml(info))
lights = uavcan.equipment.indication.LightsCommand()
lcmd = uavcan.equipment.indication.SingleLightCommand(light_id=123)
lcmd.color.red = 1
lcmd.color.green = 2
lcmd.color.blue = 3
lights.commands.append(lcmd)
lcmd.light_id += 1
lights.commands.append(lcmd)
print(to_yaml(lights))
print(to_yaml(uavcan.equipment.power.BatteryInfo()))
def enumerate_node_params(this_node, node_id):
global UAVCAN_NODE_CONFIG
param_idx = 0
while param_idx < 8192:
request = uavcan.protocol.param.GetSet(mode="request")
request.index = param_idx
(response, response_transfer), _ = yield tornado.gen.Task(
this_node.send_request, request, node_id)
if response and response.name:
# Notify connected clients of the parameter information, and store
# it so we can notify clients who connect later
UAVCAN_NODE_CONFIG[node_id][response.name] = response
send_all(response.type.get_normalized_definition(), node_id,
response)
param_idx += 1
else:
break