Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def main():
args = parse_args()
node = uavcan.make_node(args.port, node_id=123)
node.lock = threading.RLock()
uavcan.load_dsdl(args.dsdl)
samples = []
def range_cb(event):
samples.append(event.message.range)
print(event.message.range)
if len(samples) >= 100:
bias = mean(samples) - args.distance
print("Estimated bias: {:.3f}".format(bias))
print("Delta to add in decawave units: {}".format(int((bias / 2) / SPEED_OF_LIGHT)))
node.add_handler(uavcan.thirdparty.cvra.uwb_beacon.RadioRange, range_cb)
node.spin()
def main():
args, _ = parse_args()
node = uavcan.make_node(args.iface, node_id=args.uavcan_id)
uavcan.load_dsdl(args.dsdl_path)
rospy.init_node('mine_alarm_signal', disable_signals=True)
app = LedSignalApplication(node, args.topic)
try:
node.spin()
except KeyboardInterrupt:
pass
def createNode(com):
node = uavcan.make_node(com, node_id=126, bitrate=1000000, baudrate=1000000)
node_monitor = uavcan.app.node_monitor.NodeMonitor(node)
# dynamic_node_id_allocator = uavcan.app.dynamic_node_id.CentralizedServer(node, node_monitor)
# while len(dynamic_node_id_allocator.get_allocation_table()) <= 1:
# print('Waiting for other nodes to become online...')
# node.spin(timeout=1)
while len(node_monitor.get_all_node_id()) < 1:
print('Waiting for other nodes to become online...')
node.spin(timeout=1)
all_node_ids = list(node_monitor.get_all_node_id())
print( '\nDetected Node IDs',all_node_ids)
print( 'Node ID in use',all_node_ids[0],)
node_dict = node_monitor.get(all_node_ids[0]) #momentarily, always use the first, one shouldk use an ESC detection scheme as in the examples
uavcan.equipment.power.BatteryInfo(status_flags=
uavcan.equipment.power.BatteryInfo().STATUS_FLAG_NEED_SERVICE |
uavcan.equipment.power.BatteryInfo().STATUS_FLAG_TEMP_HOT |
uavcan.equipment.power.BatteryInfo().STATUS_FLAG_CHARGED),
'status_flags'
))
print(value_to_constant_name(
uavcan.protocol.AccessCommandShell.Response(flags=
uavcan.protocol.AccessCommandShell.Response().FLAG_SHELL_ERROR |
uavcan.protocol.AccessCommandShell.Response().
FLAG_HAS_PENDING_STDOUT),
'flags'
))
# Printing transfers
node = uavcan.make_node('vcan0', node_id=42)
node.request(uavcan.protocol.GetNodeInfo.Request(), 100, lambda e: print(to_yaml(e)))
node.add_handler(uavcan.protocol.NodeStatus, lambda e: print(to_yaml(e)))
node.spin()
logger.info('Custom DSDL loaded successfully')
except Exception as ex:
logger.exception('No DSDL loaded from %r, only standard messages will be supported', dsdl_directory)
show_error('DSDL not loaded',
'Could not load DSDL definitions from %r.\n'
'The application will continue to work without the custom DSDL definitions.' % dsdl_directory,
ex, blocking=True)
# Trying to start the node on the specified interface
try:
node_info = uavcan.protocol.GetNodeInfo.Response()
node_info.name = NODE_NAME
node_info.software_version.major = __version__[0]
node_info.software_version.minor = __version__[1]
node = uavcan.make_node(iface,
node_info=node_info,
mode=uavcan.protocol.NodeStatus().MODE_OPERATIONAL,
**iface_kwargs)
# Making sure the interface is alright
node.spin(0.1)
except uavcan.transport.TransferError:
# allow unrecognized messages on startup:
logger.warning('UAVCAN Transfer Error occurred on startup', exc_info=True)
break
except Exception as ex:
logger.error('UAVCAN node init failed', exc_info=True)
show_error('Fatal error', 'Could not initialize UAVCAN node', ex, blocking=True)
else:
break
def createNodeDynamicId(com):
node = uavcan.make_node(com, node_id=126, bitrate=1000000, baudrate=1000000)
node_monitor = uavcan.app.node_monitor.NodeMonitor(node)
dynamic_node_id_allocator = uavcan.app.dynamic_node_id.CentralizedServer(node, node_monitor)
waitForAllNodesDynamicId(dynamic_node_id_allocator,node)
return node_monitor, node, dynamic_node_id_allocator
def main():
args = parse_args()
node = uavcan.make_node(args.port, node_id=42)
uavcan.load_dsdl(DSDL_DIR)
send_pwm(node, args.id, args.value)
# Spin node for 1 second
node.spin(1)
node.close()
if args.anchor and msg.anchor_addr != args.anchor:
return
print("Received a range from {}: {:.3f}".format(
msg.anchor_addr, msg.range))
if args.output:
output_file.writerow({
'ts': msg.timestamp.usec,
'range': msg.range,
'anchor_addr': msg.anchor_addr
})
args.output.flush()
node = uavcan.make_node(args.port, node_id=127)
# TODO This path is a bit too hardcoded
dsdl_path = os.path.join(
os.path.dirname(__file__), '..', '..', 'uavcan_data_types', 'cvra', 'uwb_beacon')
uavcan.load_dsdl(dsdl_path)
node.add_handler(uavcan.thirdparty.uwb_beacon.RadioRange,
range_cb)
node.spin()
def run(self):
self.node = uavcan.make_node(self.port, node_id=127)
self.node.add_handler(uavcan.protocol.NodeStatus,
self._node_status_callback)
self.node.add_handler(uavcan.thirdparty.cvra.motor.feedback.CurrentPID,
self._current_pid_callback)
self.node.add_handler(
uavcan.thirdparty.cvra.motor.feedback.VelocityPID,
self._velocity_pid_callback)
self.node.add_handler(
uavcan.thirdparty.cvra.motor.feedback.PositionPID,
self._position_pid_callback)
self.node.periodic(0.1, self._publish_setpoint)