Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def run_test(self, odrv_ctx: ODriveTestContext, logger):
import odrive.serial_transport
port = odrive.serial_transport.SerialStreamTransport(odrv_ctx.yaml['uart'], 115200)
# send garbage to throw the device off track
port.process_bytes(b"garbage\r\n\r\0trash\n")
port.process_bytes(b"\n") # start a new clean line
get_lines(port) # flush RX buffer
# info command without checksum
port.process_bytes(b"i\n")
# check if it reports the serial number (among other things)
lines = get_lines(port)
expected_line = ('Serial number: ' + odrv_ctx.yaml['serial-number']).encode('ascii')
if not expected_line in lines:
raise Exception("expected {} in ASCII protocol response but got {}".format(expected_line, str(lines)))
# info command with checksum
port.process_bytes(gcode_append_checksum(b"i") + b" ; a useless comment\n")
def run_test(self, axis0_ctx: AxisTestContext, axis1_ctx: AxisTestContext, logger):
load_ctx = axis0_ctx
driver_ctx = axis1_ctx
if driver_ctx.name == 'top-odrive.black':
# odrive.utils.start_liveplotter(lambda: [driver_ctx.odrv_ctx.handle.vbus_voltage])
odrive.utils.start_liveplotter(lambda: [driver_ctx.handle.motor.current_control.Iq_measured,
driver_ctx.handle.motor.current_control.Iq_setpoint])
# Set up viscous fluid load
logger.debug("activating load on {}...".format(load_ctx.name))
load_ctx.handle.controller.config.vel_integrator_gain = 0
load_ctx.handle.motor.config.current_lim = self._load_current
load_ctx.odrv_ctx.handle.config.brake_resistance = 0 # disable brake resistance, the power will go into the bus
load_ctx.handle.controller.set_vel_setpoint(0, 0)
request_state(load_ctx, AXIS_STATE_CLOSED_LOOP_CONTROL)
driver_test = TestHighVelocity(
override_current_limit=self._driver_current,
load_current=self._load_current, brake=False)
driver_test.check_preconditions(driver_ctx, logger)
driver_test.run_test(driver_ctx, logger)
for odrv_ctx in odrives_by_name.values():
for axis_idx, axis_ctx in enumerate(odrv_ctx.axes):
if not axis_ctx.name in args.ignore:
axes_by_name[axis_ctx.name] = axis_ctx
# Ensure mechanical couplings are valid
couplings = []
if test_rig_yaml['couplings'] is None:
test_rig_yaml['couplings'] = {}
else:
for coupling in test_rig_yaml['couplings']:
c = [axes_by_name[axis_name] for axis_name in coupling if (axis_name in axes_by_name)]
if len(c) > 1:
couplings.append(c)
app_shutdown_token = Event()
try:
for test in all_tests:
if isinstance(test, ODriveTest):
def odrv_test_thread(odrv_name):
odrv_ctx = odrives_by_name[odrv_name]
logger.notify('* running {} on {}...'.format(type(test).__name__, odrv_name))
try:
test.check_preconditions(odrv_ctx,
logger.indent(' {}: '.format(odrv_name)))
except:
raise PreconditionsNotMet()
test.run_test(odrv_ctx,
logger.indent(' {}: '.format(odrv_name)))
if test._exclusive:
def main(args):
global running
print("ODrive USB Bulk Communications")
print("---------------------------------------------------------------------")
print("USAGE:")
print("\tPOSITION_CONTROL:\n\t\tp MOTOR_NUMBER POSITION VELOCITY CURRENT")
print("\tVELOCITY_CONTROL:\n\t\tv MOTOR_NUMBER VELOCITY CURRENT")
print("\tCURRENT_CONTROL:\n\t\tc MOTOR_NUMBER CURRENT")
print("\tHALT:\n\t\th")
print("\tQuit Python Script:\n\t\tq")
print("---------------------------------------------------------------------")
# query device
dev = usbbulk.poll_odrive_bulk_device(printer=print)
print (dev.info())
print (dev.init())
# thread
thread = threading.Thread(target=receive_thread, args=[dev])
thread.start()
while running:
time.sleep(0.1)
try:
command = input("\r\nEnter ODrive command:\n")
if 'q' in command:
running = False
sys.exit()
else:
dev.send(command)
except:
running = False
def rediscover(self):
"""
Reconnects to the ODrive
"""
self.handle = odrive.find_any(
path="usb", serial_number=self.yaml['serial-number'], timeout=15)#, printer=print)
for axis_idx, axis_ctx in enumerate(self.axes):
axis_ctx.handle = self.handle.__dict__['axis{}'.format(axis_idx)]
time.sleep(0.005)
test_assert_no_error(load_ctx)
test_assert_no_error(driver_ctx)
logger.debug("using {} as driver against load, vel=20000...".format(driver_ctx.name))
set_limits(driver_ctx, logger, vel_limit=20000, current_limit=50)
init_pos = driver_ctx.handle.encoder.pos_estimate
driver_ctx.handle.controller.set_pos_setpoint(init_pos + 100000, 0, 0)
request_state(driver_ctx, AXIS_STATE_CLOSED_LOOP_CONTROL)
#for _ in range(int(5*4000/5)):
# logger.debug(str(driver_ctx.handle.motor.current_control.Iq_setpoint))
# time.sleep(0.005)
time.sleep(7)
odrive.utils.print_drv_regs("load motor ({})".format(load_ctx.name), load_ctx.handle.motor)
odrive.utils.print_drv_regs("driver motor ({})".format(driver_ctx.name), driver_ctx.handle.motor)
test_assert_no_error(load_ctx)
test_assert_no_error(driver_ctx)
#### Open a log file
f = open(log_file, 'at')
f.write(datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S') +
" : ****** ps3 started ******\n")
f.flush()
if not args.dryrun:
if __debug__:
print("Not a drytest")
if _config.get('Drive', 'type') == "Sabertooth":
drive = SabertoothPacketSerial(address=int(_config.get('Drive', 'address')),
type=_config.get('Drive', 'type'),
port=_config.get('Drive', 'port'))
elif _config.get('Drive', 'type') == "ODrive":
print("finding an odrive...")
drive = odrive.find_any() #"serial:" + _config.get('Drive', 'port'))
#drive.axis0.controller.config.control_mode = CTRL_MODE_VELOCITY_CONTROL
#drive.axis1.controller.config.control_mode = CTRL_MODE_VELOCITY_CONTROL
drive.axis1.controller.vel_ramp_enable = True
drive.axis0.controller.vel_ramp_enable = True
#dome = SabertoothPacketSerial(address=int(_config.get('Dome', 'address')),
# type=_config.get('Dome', 'type'),
# port=_config.get('Dome', 'port'))
pygame.display.init()
if args.curses:
print('\033c')
locate("-=[ PS3 Controller ]=-", 10, 0)
locate("Left", 3, 2)
locate("Right", 30, 2)
# based on https://docs.odriverobotics.com/hoverboard.html
import odrive
import time
motor_calibration_time = 6
encoder_calibration_time = 12
input("Make sure that the wheels are free to turn. Don't get hurt!\nPress ENTER to proceed.")
print("Searching for ODrive...")
odrv0 = odrive.find_any()
print("Odrive detected!")
print("--------------Motor Calibration--------------------------------")
print("Calibrating motors...")
odrv0.axis0.requested_state = 4 # AXIS_STATE_MOTOR_CALIBRATION
odrv0.axis1.requested_state = 4 # AXIS_STATE_MOTOR_CALIBRATION
time.sleep(motor_calibration_time)
if odrv0.axis0.motor.is_calibrated == 1 and odrv0.axis1.motor.is_calibrated == 1:
print("Motor successfully calibrated! Proceeding...")
else:
print("Could not calibrate motor. Something is wrong.")
print("Have you ran the first script to set all parameters?")
print("If yes and the wiring looks good, reset the ODrive and try again.\nExiting.")
quit()
print("Saving motor calibration")
odrv0.axis0.motor.config.pre_calibrated = True
def channel_from_usb_device(usb_device, printer=noprint):
"""
Inits an ODrive Protocol channel from a PyUSB device object.
"""
bulk_device = odrive.usbbulk_transport.USBBulkTransport(usb_device, printer)
printer(bulk_device.info())
bulk_device.init()
return odrive.protocol.Channel(
"USB device bus {} device {}".format(usb_device.bus, usb_device.address),
bulk_device, bulk_device)
def channel_from_usb_device(usb_device, printer=noprint):
"""
Inits an ODrive Protocol channel from a PyUSB device object.
"""
bulk_device = odrive.usbbulk_transport.USBBulkTransport(usb_device, printer)
printer(bulk_device.info())
bulk_device.init()
return odrive.protocol.Channel(
"USB device bus {} device {}".format(usb_device.bus, usb_device.address),
bulk_device, bulk_device)