Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def submit(self):
"""
Submit transfer for asynchronous handling.
"""
if self.__submitted:
raise ValueError('Cannot submit a submitted transfer')
if not self.__initialized:
raise ValueError('Cannot submit a transfer until it has been '
'initialized')
self.__submitted = True
result = libusb1.libusb_submit_transfer(self.__transfer)
if result:
self.__submitted = False
raise libusb1.USBError(result)
def _bulkTransfer(self, endpoint, data, length, timeout):
transferred = c_int()
result = libusb1.libusb_bulk_transfer(self.__handle, endpoint,
data, length, byref(transferred), timeout)
if result:
raise libusb1.USBError(result)
return transferred.value
def getASCIIStringDescriptor(self, descriptor):
"""
Fetch description string for given descriptor in first available
language.
Return value is an ASCII string.
Return None if there is no such descriptor on device.
"""
descriptor_string = create_binary_buffer(STRING_LENGTH)
result = libusb1.libusb_get_string_descriptor_ascii(self.__handle,
descriptor, descriptor_string, sizeof(descriptor_string))
if result == libusb1.LIBUSB_ERROR_NOT_FOUND:
return None
if result < 0:
raise libusb1.USBError(result)
return descriptor_string.value
def open(self):
"""
Open device.
Returns an USBDeviceHandle instance.
"""
handle = libusb1.libusb_device_handle_p()
result = libusb1.libusb_open(self.device_p, byref(handle))
if result:
raise libusb1.USBError(result)
return USBDeviceHandle(self.__context, handle)
def handleEventsTimeout(self, tv=0):
"""
Handle any pending event.
If tv is 0, will return immediately after handling already-pending
events.
Otherwise, defines the maximum amount of time to wait for events, in
seconds.
"""
if tv is None:
tv = 0
tv_s = int(tv)
tv = libusb1.timeval(tv_s, int((tv - tv_s) * 1000000))
result = libusb1.libusb_handle_events_timeout(self.__context_p,
byref(tv))
if result:
raise libusb1.USBError(result)
def open(self):
"""
Open device.
Returns an USBDeviceHandle instance.
"""
handle = libusb1.libusb_device_handle_p()
result = libusb1.libusb_open(self.device_p, byref(handle))
if result:
raise libusb1.USBError(result)
return USBDeviceHandle(self.__context, handle, self)
def getASCIIStringDescriptor(self, descriptor):
"""
Fetch description string for given descriptor in first available
language.
Return value is an ASCII string.
Return None if there is no such descriptor on device.
"""
descriptor_string = create_binary_buffer(STRING_LENGTH)
result = libusb1.libusb_get_string_descriptor_ascii(self.__handle,
descriptor, descriptor_string, sizeof(descriptor_string))
if result == libusb1.LIBUSB_ERROR_NOT_FOUND:
return None
if result < 0:
raise libusb1.USBError(result)
return descriptor_string.value
def _interruptTransfer(self, endpoint, data, length, timeout):
transferred = c_int()
result = libusb1.libusb_interrupt_transfer(self.__handle, endpoint,
data, length, byref(transferred), timeout)
if result:
raise libusb1.USBError(result)
return transferred.value
def getNextTimeout(self):
"""
Returns the next internal timeout that libusb needs to handle, in
seconds, or None if no timeout is needed.
You should not have to call this method, unless you are integrating
this class with a polling mechanism.
"""
timeval = libusb1.timeval()
result = libusb1.libusb_get_next_timeout(self.__context_p,
byref(timeval))
if result == 0:
result = None
elif result == 1:
result = timeval.tv_sec + (timeval.tv_usec * 0.000001)
else:
raise libusb1.USBError(result)
return result
def detachKernelDriver(self, interface):
"""
Ask kernel driver to detach from given interface number.
"""
result = libusb1.libusb_detach_kernel_driver(self.__handle, interface)
if result:
raise libusb1.USBError(result)