Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def held_time(self):
"""
The length of time (in seconds) that the device has been held for.
This is counted from the first execution of the :attr:`when_held` event
rather than when the device activated, in contrast to
:attr:`~EventsMixin.active_time`. If the device is not currently held,
this is ``None``.
"""
if self._held_from is not None:
return self.pin_factory.ticks_diff(self.pin_factory.ticks(),
self._held_from)
else:
return None
class HoldThread(GPIOThread):
"""
Extends :class:`GPIOThread`. Provides a background thread that repeatedly
fires the :attr:`HoldMixin.when_held` event as long as the owning
device is active.
"""
def __init__(self, parent):
super(HoldThread, self).__init__(
target=self.held, args=(weakref.proxy(parent),))
self.holding = Event()
self.start()
def held(self, parent):
try:
while not self.stopping.is_set():
if self.holding.wait(0.1):
self.holding.clear()
self.holding.clear()
while not (
self.stopping.is_set() or
parent._inactive_event.wait(parent.hold_time)
):
if parent._held_from is None:
parent._held_from = parent.pin_factory.ticks()
parent._fire_held()
if not parent.hold_repeat:
break
except ReferenceError:
# Parent is dead; time to die!
pass
class GPIOQueue(GPIOThread):
"""
Extends :class:`GPIOThread`. Provides a background thread that monitors a
device's values and provides a running *average* (defaults to median) of
those values. If the *parent* device includes the :class:`EventsMixin` in
its ancestry, the thread automatically calls
:meth:`~EventsMixin._fire_events`.
"""
def __init__(
self, parent, queue_len=5, sample_wait=0.0, partial=False,
average=median, ignore=None):
assert callable(average)
super(GPIOQueue, self).__init__(target=self.fill)
if queue_len < 1:
raise BadQueueLen('queue_len must be at least one')
if sample_wait < 0:
raise BadWaitTime('sample_wait must be 0 or greater')
def try_start_polling(self):
if (not self._poll_thread and self._getter and self._callback and
self._detector):
self._poll_thread = GPIOThread(
target=self._poll,
args=(self._poll_interval, self._debounce_time, self._getter,
self._detector, self._callback))
self._poll_thread.start()
def join(self, timeout=None):
super(GPIOThread, self).join(timeout)
if self.is_alive():
assert timeout is not None
# timeout can't be None here because if it was, then join()
# wouldn't return until the thread was dead
raise ZombieThread(
"Thread failed to die within %d seconds" % timeout)
else:
_THREADS.discard(self)
Number of times to blink; :data:`None` (the default) means forever.
:param bool background:
If :data:`True`, start a background thread to continue blinking and
return immediately. If :data:`False`, only return when the blink is
finished (warning: the default value of *n* will result in this
method never returning).
"""
for led in self.leds:
if isinstance(led, LED):
if fade_in_time:
raise ValueError('fade_in_time must be 0 with non-PWM LEDs')
if fade_out_time:
raise ValueError('fade_out_time must be 0 with non-PWM LEDs')
self._stop_blink()
self._blink_thread = GPIOThread(
target=self._blink_device,
args=(on_time, off_time, fade_in_time, fade_out_time, n))
self._blink_thread.start()
if not background:
self._blink_thread.join()
self._blink_thread = None
:param float fade_out_time:
Number of seconds to spend fading out. Defaults to 0.
:type n: int or None
:param n:
Number of times to blink; :data:`None` (the default) means forever.
:param bool background:
If :data:`True` (the default), start a background thread to
continue blinking and return immediately. If :data:`False`, only
return when the blink is finished (warning: the default value of
*n* will result in this method never returning).
"""
self._stop_blink()
self._blink_thread = GPIOThread(
target=self._blink_device,
args=(on_time, off_time, fade_in_time, fade_out_time, n)
)
self._blink_thread.start()
if not background:
self._blink_thread.join()
self._blink_thread = None