Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
return
status = self.Status()
try:
if status & self.AIN_SCAN_OVERRUN:
raise OverrunERROR
except:
print('AInScanRead: Overrun Error')
return
if self.continuous_mode:
return list(data)
# if nbytes is a multiple of wMaxPacketSize the device will send a zero byte packet.
if ((int(nSamples*2) % self.wMaxPacketSize) == 0 and not(status & self.AIN_SCAN_RUNNING)):
data2 = self.udev.bulkRead(libusb1.LIBUSB_ENDPOINT_IN | 1, 2, 100)
self.AInScanStop()
self.AInScanClearFIFO()
return list(data)
def controlRead(self, request_type, request, value, index, length,
timeout=0):
"""
Synchronous control read.
timeout: in milliseconds, how long to wait for data. Set to 0 to
disable.
See controlWrite for other parameters description.
Returns received data.
"""
request_type = (request_type & ~libusb1.USB_ENDPOINT_DIR_MASK) | \
libusb1.LIBUSB_ENDPOINT_IN
data = create_binary_buffer(length)
transferred = self._controlTransfer(request_type, request, value,
index, data, length, timeout)
return data.raw[:transferred]
def Status(self):
"""
This command retrives the status of the device
Bit 0: 0 = Sync slave, 1 = sync master
Bit 1: 0 = Trigger falling edge, 1 = trigger rising edge
Bits 2-14 unused.
Bit 15: 1 = program memory update mode
"""
request_type = libusb1.LIBUSB_ENDPOINT_OUT | \
libusb1.LIBUSB_TYPE_CLASS | \
libusb1.LIBUSB_RECIPIENT_INTERFACE
request = 0x9 # HID Set_Report
wValue = (2 << 8) | self.GET_STATUS # HID output
wIndex = 0 # interface
value = self.udev.controlWrite(request_type, request, wValue, wIndex, [self.GET_STATUS], timeout = 100)
value = unpack('BBB',self.udev.interruptRead(libusb1.LIBUSB_ENDPOINT_IN | 2, 3, timeout = 100))
return (value[1] | (value[2]<<8)) & 0xf
libusb1.LIBUSB_TYPE_CLASS | \
libusb1.LIBUSB_RECIPIENT_INTERFACE
request = 0x9 # HID Set_Report
wValue = (2 << 8) | self.AIN # HID output
wIndex = 0 # interface
if channel < 0 or channel > 7:
raise ValueError('AIn: channel out of range.')
return
if gain == self.SE_10_00V:
# offset channels by 8
# set range value to 0 (+/-10V)
ret = self.udev.controlWrite(request_type, request, wValue, wIndex, [self.AIN, channel+8, 0x0], timeout = 100)
else:
ret = self.udev.controlWrite(request_type, request, wValue, wIndex, [self.AIN, channel, gain], timeout = 100)
value = unpack('BBB',self.udev.interruptRead(libusb1.LIBUSB_ENDPOINT_IN | 1, 3, timeout = 1000))
value = value[1] | (value[2] << 8)
if gain == self.SE_10_00V:
# The data is 2's complement signed 13 bit number
if value > 0x7ffc:
value = 0
elif value > 0x7ff8:
value = 0x3fff
else:
value >>= 1
value &= 0x3fff
value -= 0x2000
value = int(value*self.CalSE[channel].slope + self.CalSE[channel].intercept)
else:
# The data is 2's complement signed 14 bit number
value ,= unpack('h',pack('H',value))
value = value/4
# add 8 to channels for Single Ended
channels[i] = lowchannel + i + 8
self.ALoadQueue(nchan, channels, gains)
if gains[0] == self.SE_10_00V:
lowchannel += 8
hichannel += 8
buf = [self.AIN_SCAN, lowchannel, hichannel, count & 0xff, (count>>8) & 0xff, (count>>16) & 0xff, \
(count>>24) & 0xff, prescale, int(preload) & 0xff, (int(preload)>>8) & 0xff, options]
ret = self.udev.controlWrite(request_type, request, wValue, wIndex, buf, timeout = 5000)
i = 0
pipe = 1 # initial endpoint to receive data
sdata = [0]*nSamples
while nSamples > 0:
value = unpack('h'*32,self.udev.interruptRead(libusb1.LIBUSB_ENDPOINT_IN | (pipe+2), 64, timeout = 1000))
if nSamples > 31:
for k in range(31):
sdata[i+k] = int(value[k]>>4)
nSamples -= 31
else:
for k in range(nSamples):
sdata[i+k] = int(value[k]>>4)
nSamples = 0
break
i += 31
pipe = pipe%3 + 1 # pip should take on the values 1, 2 or 3
return sdata
def AInScanRead(self, count, options):
# Each scan consists of 4 bytes times the number of active channels in the scan queue
# each sample consisits of [ scan queue index | 24 bit signed value ]
nBytes = count*self.Queue[0]*4
data = [0]*nBytes
try:
data = self.udev.bulkRead(libusb1.LIBUSB_ENDPOINT_IN | 1, nBytes, 0)
except:
print('AInScanRead: error in bulk transfer')
return
data = list(unpack('I'*count*self.Queue[0], data))
if (options & self.CONTINUOUS):
return data
self.AInScanStop()
self.AInScanFlush()
return data
in endpoint and one out endpoint. Currently, this is the only type of
interface supported.
IOError: If the device has been disconnected.
"""
super(LibUsbHandle, self).__init__(device.getSerialNumber(), name=name,
default_timeout_ms=default_timeout_ms)
self._setting = setting
self._device = device
self._read_endpoint = None
self._write_endpoint = None
self._handle = None
for endpoint in self._setting.iterEndpoints():
address = endpoint.getAddress()
if address & libusb1.USB_ENDPOINT_DIR_MASK == libusb1.LIBUSB_ENDPOINT_IN:
if self._read_endpoint is not None:
raise usb_exceptions.InvalidEndpointsError(
'Multiple in endpoints found')
self._read_endpoint = address
else:
if self._write_endpoint is not None:
raise usb_exceptions.InvalidEndpointsError(
'Multiple out endpoints found')
self._write_endpoint = address
if self._read_endpoint is None:
raise usb_exceptions.InvalidEndpointsError('Missing in endpoint')
if self._write_endpoint is None:
raise usb_exceptions.InvalidEndpointsError('Missing out endpoint')
handle = self._device.open()
self.ScanStop()
self.ScanClearFIFO()
self.BulkFlush(5)
self.status = self.Status()
return list(data)
status = self.Status()
if status & self.SCAN_OVERRUN:
raise OverrunERROR
if self.scan_mode & self.CONTINUOUS_READOUT:
return list(data)
# if nbytes is a multiple of wMaxPacketSize the device will send a zero byte packet.
if ((int(nSamples*2) % self.wMaxPacketSize) == 0 and not(status & self.PACER_RUNNING)):
data2 = self.udev.bulkRead(libusb1.LIBUSB_ENDPOINT_IN | 1, 2, 100)
self.ScanStop()
self.ScanClearFIFO()
self.BulkFlush(5)
return list(data)
in endpoint and one out endpoint. Currently, this is the only type of
interface supported.
IOError: If the device has been disconnected.
"""
super(LibUsbHandle, self).__init__(device.getSerialNumber(), name=name,
default_timeout_ms=default_timeout_ms)
self._setting = setting
self._device = device
self._read_endpoint = None
self._write_endpoint = None
self._handle = None
for endpoint in self._setting.iterEndpoints():
address = endpoint.getAddress()
if address & libusb1.USB_ENDPOINT_DIR_MASK == libusb1.LIBUSB_ENDPOINT_IN:
if self._read_endpoint is not None:
raise usb_exceptions.InvalidEndpointsError(
'Multiple in endpoints found')
self._read_endpoint = address
else:
if self._write_endpoint is not None:
raise usb_exceptions.InvalidEndpointsError(
'Multiple out endpoints found')
self._write_endpoint = address
if self._read_endpoint is None:
raise usb_exceptions.InvalidEndpointsError('Missing in endpoint')
if self._write_endpoint is None:
raise usb_exceptions.InvalidEndpointsError('Missing out endpoint')
handle = self._device.open()
def AInScanRead(self, nScan):
nSamples = int(nScan * self.nChan)
if self.options & self.IMMEDIATE_TRANSFER_MODE:
for i in range(self.nSamples):
try:
data.extend(unpack('H',self.udev.bulkRead(libusb1.LIBUSB_ENDPOINT_IN | 1, 2, 2000)))
except:
print('AInScanRead: error in bulk transfer in immmediate transfer mode.')
return
else:
try:
data = unpack('H'*nSamples, self.udev.bulkRead(libusb1.LIBUSB_ENDPOINT_IN | 1, int(2*nSamples), 2000))
except:
print('AInScanRead: error in bulk transfer!')
return
status = self.Status()
try:
if status & self.AIN_SCAN_OVERRUN:
raise OverrunERROR
except:
print('AInScanRead: Overrun Error')