Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _set_gpio_direction(self, width: int, pins: int,
direction: int) -> None:
if pins & self._spi_mask:
raise SpiIOError('Cannot access SPI pins as GPIO')
gpio_mask = (1 << width) - 1
gpio_mask &= ~self._spi_mask
if (pins & gpio_mask) != pins:
raise SpiIOError('No such GPIO pin(s)')
self._gpio_dir &= ~pins
self._gpio_dir |= (pins & direction)
self._gpio_mask = gpio_mask & pins
def _force(self, frequency: float, sequence: bytes):
if not self._ftdi.is_connected:
raise SpiIOError("FTDI controller not initialized")
if len(sequence) > SpiController.PAYLOAD_MAX_LENGTH:
raise SpiIOError("Output payload is too large")
if self._frequency != frequency:
self._ftdi.set_frequency(frequency)
# store the requested value, not the actual one (best effort),
# to avoid setting unavailable values on each call.
self._frequency = frequency
cmd = bytearray()
direction = self.direction & 0xFF
for ctrl in sequence:
ctrl &= self._spi_mask
ctrl |= self._gpio_low
cmd.extend((Ftdi.SET_BITS_LOW, ctrl, direction))
self._ftdi.write_data(cmd)
def _force(self, frequency: float, sequence: bytes):
if not self._ftdi.is_connected:
raise SpiIOError("FTDI controller not initialized")
if len(sequence) > SpiController.PAYLOAD_MAX_LENGTH:
raise SpiIOError("Output payload is too large")
if self._frequency != frequency:
self._ftdi.set_frequency(frequency)
# store the requested value, not the actual one (best effort),
# to avoid setting unavailable values on each call.
self._frequency = frequency
cmd = bytearray()
direction = self.direction & 0xFF
for ctrl in sequence:
ctrl &= self._spi_mask
ctrl |= self._gpio_low
cmd.extend((Ftdi.SET_BITS_LOW, ctrl, direction))
self._ftdi.write_data(cmd)
def get_gpio(self) -> SpiGpioPort:
"""Retrieve the GPIO port.
:return: GPIO port
"""
with self._lock:
if not self._ftdi.is_connected:
raise SpiIOError("FTDI controller not initialized")
if not self._gpio_port:
self._gpio_port = SpiGpioPort(self)
return self._gpio_port
def _set_gpio_direction(self, width: int, pins: int,
direction: int) -> None:
if pins & self._spi_mask:
raise SpiIOError('Cannot access SPI pins as GPIO')
gpio_mask = (1 << width) - 1
gpio_mask &= ~self._spi_mask
if (pins & gpio_mask) != pins:
raise SpiIOError('No such GPIO pin(s)')
self._gpio_dir &= ~pins
self._gpio_dir |= (pins & direction)
self._gpio_mask = gpio_mask & pins
mode: int = 0) -> SpiPort:
"""Obtain a SPI port to drive a SPI device selected by Chip Select.
:note: SPI mode 1 and 3 are not officially supported.
:param cs: chip select slot, starting from 0
:param freq: SPI bus frequency for this slave in Hz
:param mode: SPI mode [0, 1, 2, 3]
"""
with self._lock:
if not self._ftdi.is_connected:
raise SpiIOError("FTDI controller not initialized")
if cs >= len(self._spi_ports):
if cs < 5:
# increase cs_count (up to 4) to reserve more /CS channels
raise SpiIOError("/CS pin %d not reserved for SPI" % cs)
raise SpiIOError("No such SPI port: %d" % cs)
if not self._spi_ports[cs]:
freq = min(freq or self._frequency, self.frequency_max)
hold = freq and (1+int(1E6/freq))
self._spi_ports[cs] = SpiPort(self, cs, cs_hold=hold,
spi_mode=mode)
self._spi_ports[cs].set_frequency(freq)
self._flush()
return self._spi_ports[cs]
def _exchange_half_duplex(self, frequency: float,
out: Union[bytes, bytearray, Iterable[int]],
readlen: int, cs_prolog: bytes, cs_epilog: bytes,
cpol: bool, cpha: bool,
droptail: int) -> bytes:
if not self._ftdi.is_connected:
raise SpiIOError("FTDI controller not initialized")
if len(out) > SpiController.PAYLOAD_MAX_LENGTH:
raise SpiIOError("Output payload is too large")
if readlen > SpiController.PAYLOAD_MAX_LENGTH:
raise SpiIOError("Input payload is too large")
if cpha:
# to enable CPHA, we need to use a workaround with FTDI device,
# that is enable 3-phase clocking (which is usually dedicated to
# I2C support). This mode use use 3 clock period instead of 2,
# which implies the FTDI frequency should be fixed to match the
# requested one.
frequency = (3*frequency)//2
if self._frequency != frequency:
self._ftdi.set_frequency(frequency)
# store the requested value, not the actual one (best effort),
# to avoid setting unavailable values on each call.
self._frequency = frequency
direction = self.direction & 0xFF # low bits only
cmd = bytearray()
for ctrl in cs_prolog or []:
ctrl &= self._spi_mask
if 'turbo' in kwargs:
self._turbo = bool(kwargs['turbo'])
del kwargs['turbo']
if 'direction' in kwargs:
io_dir = int(kwargs['direction'])
del kwargs['direction']
else:
io_dir = 0
if 'initial' in kwargs:
io_out = int(kwargs['initial'])
del kwargs['initial']
else:
io_out = 0
if 'interface' in kwargs:
if isinstance(url, str):
raise SpiIOError('url and interface are mutually exclusive')
interface = int(kwargs['interface'])
del kwargs['interface']
else:
interface = 1
with self._lock:
if self._frequency > 0.0:
raise SpiIOError('Already configured')
self._cs_bits = (((SpiController.CS_BIT << self._cs_count) - 1) &
~(SpiController.CS_BIT - 1))
self._spi_ports = [None] * self._cs_count
self._spi_dir = (self._cs_bits |
SpiController.DO_BIT |
SpiController.SCK_BIT)
self._spi_mask = self._cs_bits | self.SPI_BITS
# until the device is open, there is no way to tell if it has a
# wide (16) or narrow port (8). Lower API can deal with any, so
def _read_raw(self, read_high: bool) -> int:
if not self._ftdi.is_connected:
raise SpiIOError("FTDI controller not initialized")
if read_high:
cmd = bytes([Ftdi.GET_BITS_LOW,
Ftdi.GET_BITS_HIGH,
Ftdi.SEND_IMMEDIATE])
fmt = '