Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# Returns whole buffers
self.assertEqual(
bytearray(itertools.chain(*transfer.getISOBufferList())),
buff,
)
# Returns actually transfered data, so here nothing
self.assertEqual(bytearray(
itertools.chain(*[x for _, x in transfer.iterISO()])),
bytearray(),
)
# Fake reception of whole transfers
c_transfer = getattr(
transfer,
'_' + transfer.__class__.__name__ + '__transfer'
)
for iso_metadata in libusb1.get_iso_packet_list(c_transfer):
iso_metadata.actual_length = iso_metadata.length
# Now iterISO returns everythig
self.assertEqual(bytearray(
itertools.chain(*[x for _, x in transfer.iterISO()])),
buff,
)
def submit(self):
"""
Submit transfer for asynchronous handling.
"""
if self.__submitted:
raise ValueError('Cannot submit a submitted transfer')
if not self.__initialized:
raise ValueError('Cannot submit a transfer until it has been '
'initialized')
self.__submitted = True
result = libusb1.libusb_submit_transfer(self.__transfer)
if result:
self.__submitted = False
raise libusb1.USBError(result)
def CounterInit(self, counter):
'''
This command initializes the 32-bit event counter. On a write, the
counter will be initialized to zero.
'''
request_type = libusb1.LIBUSB_TYPE_VENDOR
wValue = 0
wIndex = 0
result = self.udev.controlWrite(request_type, self.COUNTER, wValue, wIndex, [counter], timeout = 100)
def __init__(self):
"""
Create a new USB context.
"""
context_p = libusb1.libusb_context_p()
result = libusb1.libusb_init(byref(context_p))
if result:
raise libusb1.USBError(result)
self.__context_p = context_p
def _exit(self):
context_p = self.__context_p
if context_p:
for handle in self.__hotplug_callback_dict.keys():
self.hotplugDeregisterCallback(handle)
pop = self.__close_set.pop
while True:
try:
closable = pop()
except self.__KeyError:
break
closable.close()
self.__libusb_exit(context_p)
self.__context_p = libusb1.libusb_context_p()
self.__added_cb = self.__null_pointer
self.__removed_cb = self.__null_pointer
def __init__(self):
"""
Create a new USB context.
"""
# Used to prevent an exit to cause a segfault if a concurrent thread
# is still in libusb.
self.__context_refcount = 0
self.__context_cond = threading.Condition()
context_p = libusb1.libusb_context_p()
result = libusb1.libusb_init(byref(context_p))
if result:
raise libusb1.USBError(result)
self.__context_p = context_p
def open(self):
"""
Finish context initialisation, as is normally done in __enter__ .
This happens automatically on the first method call needing access to
the uninitialised properties, but with a warning.
Call this method ONLY if your usage pattern prevents you from using the
with USBContext() as contewt:
form: this means there are ways to avoid calling close(), which can
cause issues particularly hard to debug (ex: interpreter hangs on
exit).
"""
assert self.__context_refcount == 0
mayRaiseUSBError(libusb1.libusb_init(byref(self.__context_p)))
return self
def __init__(self):
"""
Create a new USB context.
"""
context_p = libusb1.libusb_context_p()
result = libusb1.libusb_init(byref(context_p))
if result:
raise libusb1.USBError(result)
self.__context_p = context_p
def __init__(self):
"""
Create a new USB context.
"""
# Used to prevent an exit to cause a segfault if a concurrent thread
# is still in libusb.
self.__context_refcount = 0
self.__context_cond = threading.Condition()
context_p = libusb1.libusb_context_p()
result = libusb1.libusb_init(byref(context_p))
if result:
raise libusb1.USBError(result)
self.__context_p = context_p
def _bulkTransfer(self, endpoint, data, length, timeout):
transferred = c_int()
result = libusb1.libusb_bulk_transfer(self.__handle, endpoint,
data, length, byref(transferred), timeout)
if result:
raise libusb1.USBError(result)
return transferred.value