Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __real_send(self, instruction_packet, wait_for_status_packet, _force_lock):
if self.closed:
raise DxlError('try to send a packet on a closed serial communication')
with self.__force_lock(_force_lock) or self._serial_lock:
data = instruction_packet.to_string()
nbytes = self._serial.write(data)
if len(data) != nbytes:
self.flush(_force_lock=True)
raise DxlCommunicationError(self,
'instruction packet not entirely sent',
instruction_packet)
if not wait_for_status_packet:
return
status_packet = self.__real_read(instruction_packet, _force_lock=True)
return status_packet
def __real_read(self, instruction_packet, _force_lock):
with self.__force_lock(_force_lock) or self._serial_lock:
data = self._serial.read(self._protocol.DxlPacketHeader.length)
if not data:
self.flush(_force_lock=True)
raise DxlTimeoutError(self, instruction_packet, instruction_packet.id)
try:
header = self._protocol.DxlPacketHeader.from_string(data)
data += self._serial.read(header.packet_length)
status_packet = self._protocol.DxlStatusPacket.from_string(data)
except ValueError:
self.flush(_force_lock=True)
msg = 'could not parse received data {}'.format(bytearray(data))
raise DxlCommunicationError(self, msg, instruction_packet)
return status_packet
def __init__(self, dxl_io, instruction_packet, ids):
DxlCommunicationError.__init__(self, dxl_io, 'timeout occured', instruction_packet)
self.ids = ids
sp = self.__real_send(instruction_packet, wait_for_status_packet, _force_lock)
if sp and sp.error:
errors = decode_error(sp.error)
for e in errors:
handler_name = 'handle_{}'.format(e.lower().replace(' ', '_'))
f = operator.methodcaller(handler_name, instruction_packet)
f(error_handler)
return sp
except DxlTimeoutError as e:
self.flush(_force_lock=False)
error_handler.handle_timeout(e)
except DxlCommunicationError as e:
self.flush(_force_lock=False)
error_handler.handle_communication_error(e)
""" Base class for all errors encountered using :class:`~pypot.dynamixel.io.DxlIO`. """
pass
class DxlCommunicationError(DxlError):
""" Base error for communication error encountered when using :class:`~pypot.dynamixel.io.DxlIO`. """
def __init__(self, dxl_io, message, instruction_packet):
self.dxl_io = dxl_io
self.message = message
self.instruction_packet = instruction_packet
def __str__(self):
return '{self.message} after sending {self.instruction_packet}'.format(self=self)
class DxlTimeoutError(DxlCommunicationError):
""" Timeout error encountered when using :class:`~pypot.dynamixel.io.DxlIO`. """
def __init__(self, dxl_io, instruction_packet, ids):
DxlCommunicationError.__init__(self, dxl_io, 'timeout occured', instruction_packet)
self.ids = ids
def __str__(self):
return 'motors {} did not respond after sending {}'.format(self.ids, self.instruction_packet)
@contextmanager
def with_True():
yield True