Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
self._serial = serial.Serial(port, 9600)
self._serial.close()
self._serial = serial.Serial(port, baudrate, timeout=timeout)
self.__used_ports.add(port)
if platform.system() == 'Darwin' and self._sync_read:
if not self.ping(self._protocol.DxlBroadcast):
self.close()
continue
else:
time.sleep(self.timeout)
self.flush()
break
else:
raise DxlError('could not connect to the port {}'.format(self.port))
def __real_send(self, instruction_packet, wait_for_status_packet, _force_lock):
if self.closed:
raise DxlError('try to send a packet on a closed serial communication')
with self.__force_lock(_force_lock) or self._serial_lock:
data = instruction_packet.to_string()
nbytes = self._serial.write(data)
if len(data) != nbytes:
self.flush(_force_lock=True)
raise DxlCommunicationError(self,
'instruction packet not entirely sent',
instruction_packet)
if not wait_for_status_packet:
return
status_packet = self.__real_read(instruction_packet, _force_lock=True)
return status_packet
def flush(self, _force_lock=False):
""" Flushes the serial communication (both input and output). """
if self.closed:
raise DxlError('attempt to flush a closed serial communication')
with self.__force_lock(_force_lock) or self._serial_lock:
self._serial.flushInput()
self._serial.flushOutput()
try:
with DxlIOCls(port) as dxl:
_ids_founds = dxl.scan(ids)
ids_founds += _ids_founds
if strict and len(_ids_founds) == len(ids):
return port
if not strict and len(_ids_founds) >= len(ids) / 2:
logger.warning('Missing ids: {}'.format(ids, list(set(ids) - set(_ids_founds))))
return port
if len(ids_founds) > 0:
logger.warning('Port:{} ids found:{}'.format(port, _ids_founds))
except DxlError:
logger.warning('DxlError on port {}'.format(port))
continue
raise IndexError('No suitable port found for ids {}. These ids are missing {} !'.format(
ids, list(set(ids) - set(ids_founds))))
def setup(self):
torques = self.io.is_torque_enabled(self.ids)
for m, c in zip(self.working_motors, torques):
m.compliant = not c
self._old_torques = torques
self._old_goals = {m.id: 0.0 for m in self.motors}
try:
values = self.io.get_goal_position_speed_load(self.ids)
positions, speeds, loads = zip(*values)
except ValueError:
raise DxlError("Couldn't initialize pos/speed/load sync loop!")
for m, p, s, l in zip(self.working_motors, positions, speeds, loads):
m.__dict__['goal_position'] = p
m.__dict__['moving_speed'] = s
m.__dict__['torque_limit'] = l