Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
rospy.init_node('position_publisher', disable_signals=True)
uwb_position_pub = rospy.Publisher('uwb_position', Point, queue_size=1)
node = uavcan.make_node(can_interface)
uavcan.load_dsdl(dsdl_path)
def uwb_position_cb(event):
x, y = event.message.x, event.message.y
br = tf2_ros.TransformBroadcaster()
br.sendTransform(tf_from_position(x, y, 0))
uwb_position_pub.publish(Point(x, y, 0))
handle = node.add_handler(uavcan.thirdparty.cvra.uwb_beacon.TagPosition,
uwb_position_cb)
try:
try:
node.spin()
except uavcan.UAVCANException:
pass
except KeyboardInterrupt:
pass
def _publish_led_command(self):
# Haven't discovered the board yet
if self.led_board_id is None:
return
msg = uavcan.thirdparty.cvra.motor.control.Voltage(node_id=self.led_board_id,
voltage=self.led_voltage)
self.node.broadcast(msg)
def __init__(self, node, detector_id):
self.node = node
self.detector_id = detector_id
self.node.add_handler(uavcan.thirdparty.cvra.metal_detector.EMIRawSignal, self._callback)
def node_thread(tun_fd, node, can_to_tap, tap_to_can):
def msg_callback(event):
msg = event.message
can_to_tap.put(msg.data)
node.add_handler(uavcan.thirdparty.cvra.uwb_beacon.DataPacket, msg_callback)
while True:
# A timeout of 0 means only process frames that are immediately
# available
try:
node.spin(timeout=0)
except uavcan.transport.TransferError:
print("uavcan exception, ignoring...")
pass
try:
packet = tap_to_can.get(block=False)
except Empty:
continue
# Checks that the packet fits in a UWB frame
def send_pwm(node, dst_id, values):
msg = uavcan.thirdparty.cvra.io.ServoPWM(node_id=dst_id, servo_pos=values)
node.broadcast(msg, priority=uavcan.TRANSFER_PRIORITY_HIGHEST)
try:
node.spin(timeout=0)
except uavcan.transport.TransferError:
print("uavcan exception, ignoring...")
pass
try:
packet = tap_to_can.get(block=False)
except Empty:
continue
# Checks that the packet fits in a UWB frame
assert len(packet) < 1024
# Finally send it over CAN
msg = uavcan.thirdparty.cvra.uwb_beacon.DataPacket()
msg.dst_addr = 0xffff # broadcast
msg.data = list(packet)
node.broadcast(msg)
def __call__(self, node_id, value):
return {
'voltage': uavcan.thirdparty.cvra.motor.control.Voltage(node_id=node_id, voltage=value),
'torque': uavcan.thirdparty.cvra.motor.control.Torque(node_id=node_id, torque=value),
'velocity': uavcan.thirdparty.cvra.motor.control.Velocity(node_id=node_id, velocity=value),
'position': uavcan.thirdparty.cvra.motor.control.Position(node_id=node_id, position=value),
}[self.value]
def main():
args = parse_args()
config = yaml.load(args.config)
config = config['actuator'][args.board]
if args.dsdl:
uavcan.load_dsdl(args.dsdl)
global node
node = uavcan.make_node(args.port, node_id=127)
node.add_handler(uavcan.protocol.NodeStatus, node_status_callback)
req = uavcan.thirdparty.cvra.motor.config.LoadConfiguration.Request()
req.position_pid.kp = config['control']['position']['kp']
req.position_pid.ki = config['control']['position']['ki']
req.position_pid.kd = config['control']['position']['kd']
req.velocity_pid.kp = config['control']['velocity']['kp']
req.velocity_pid.ki = config['control']['velocity']['ki']
req.velocity_pid.kd = config['control']['velocity']['kd']
req.current_pid.kp = config['control']['current']['kp']
req.current_pid.ki = config['control']['current']['ki']
req.current_pid.kd = config['control']['current']['kd']
req.torque_limit = config['control']['torque_limit']
req.acceleration_limit = config['control']['acceleration_limit']
req.low_batt_th = config['control']['low_batt_th']
req.velocity_limit = config['control']['velocity_limit']
def __init__(self, node):
self.node = node
self.data = {
'wastewater_g': { 'pts': [[894, 1750], [1500, 1750], [1500, 2000], [894, 2000]], 'fill': 'g' },
'wastewater_o': { 'pts': [[1500, 1750], [2106, 1750], [2106, 2000], [1500, 2000]], 'fill': 'o' },
'start_o': { 'pts': [[0, 0], [400, 0], [400, 650], [0, 650]], 'fill': 'o' },
'start_g': { 'pts': [[3000, 0], [2600, 0], [2600, 650], [3000, 650]], 'fill': 'g' },
}
self.data.update(make_cubes('cube_1_o{}', 850, 540, False))
self.data.update(make_cubes('cube_2_o{}', 300, 1190, False))
self.data.update(make_cubes('cube_3_o{}', 1100, 1500, False))
self.data.update(make_cubes('cube_1_g{}', 2150, 540, True))
self.data.update(make_cubes('cube_2_g{}', 2700, 1190, True))
self.data.update(make_cubes('cube_3_g{}', 1900, 1500, True))
self.node.add_handler(uavcan.thirdparty.cvra.uwb_beacon.TagPosition, self._callback)
def __call__(self, node_id, value):
return {
'voltage': uavcan.thirdparty.cvra.motor.control.Voltage(node_id=node_id, voltage=value),
'torque': uavcan.thirdparty.cvra.motor.control.Torque(node_id=node_id, torque=value),
'velocity': uavcan.thirdparty.cvra.motor.control.Velocity(node_id=node_id, velocity=value),
'position': uavcan.thirdparty.cvra.motor.control.Position(node_id=node_id, position=value),
}[self.value]