How to use the gpiozero.threads.GPIOThread function in gpiozero

To help you get started, we’ve selected a few gpiozero examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github gpiozero / gpiozero / gpiozero / mixins.py View on Github external
def held_time(self):
        """
        The length of time (in seconds) that the device has been held for.
        This is counted from the first execution of the :attr:`when_held` event
        rather than when the device activated, in contrast to
        :attr:`~EventsMixin.active_time`. If the device is not currently held,
        this is ``None``.
        """
        if self._held_from is not None:
            return self.pin_factory.ticks_diff(self.pin_factory.ticks(),
                                               self._held_from)
        else:
            return None


class HoldThread(GPIOThread):
    """
    Extends :class:`GPIOThread`. Provides a background thread that repeatedly
    fires the :attr:`HoldMixin.when_held` event as long as the owning
    device is active.
    """
    def __init__(self, parent):
        super(HoldThread, self).__init__(
            target=self.held, args=(weakref.proxy(parent),))
        self.holding = Event()
        self.start()

    def held(self, parent):
        try:
            while not self.stopping.is_set():
                if self.holding.wait(0.1):
                    self.holding.clear()
github gpiozero / gpiozero / gpiozero / mixins.py View on Github external
self.holding.clear()
                    while not (
                            self.stopping.is_set() or
                            parent._inactive_event.wait(parent.hold_time)
                            ):
                        if parent._held_from is None:
                            parent._held_from = parent.pin_factory.ticks()
                        parent._fire_held()
                        if not parent.hold_repeat:
                            break
        except ReferenceError:
            # Parent is dead; time to die!
            pass


class GPIOQueue(GPIOThread):
    """
    Extends :class:`GPIOThread`. Provides a background thread that monitors a
    device's values and provides a running *average* (defaults to median) of
    those values. If the *parent* device includes the :class:`EventsMixin` in
    its ancestry, the thread automatically calls
    :meth:`~EventsMixin._fire_events`.
    """
    def __init__(
            self, parent, queue_len=5, sample_wait=0.0, partial=False,
            average=median, ignore=None):
        assert callable(average)
        super(GPIOQueue, self).__init__(target=self.fill)
        if queue_len < 1:
            raise BadQueueLen('queue_len must be at least one')
        if sample_wait < 0:
            raise BadWaitTime('sample_wait must be 0 or greater')
github google / aiyprojects-raspbian / src / aiy / pins.py View on Github external
def try_start_polling(self):
        if (not self._poll_thread and self._getter and self._callback and
                self._detector):
            self._poll_thread = GPIOThread(
                target=self._poll,
                args=(self._poll_interval, self._debounce_time, self._getter,
                      self._detector, self._callback))
            self._poll_thread.start()
github gpiozero / gpiozero / gpiozero / threads.py View on Github external
def join(self, timeout=None):
        super(GPIOThread, self).join(timeout)
        if self.is_alive():
            assert timeout is not None
            # timeout can't be None here because if it was, then join()
            # wouldn't return until the thread was dead
            raise ZombieThread(
                "Thread failed to die within %d seconds" % timeout)
        else:
            _THREADS.discard(self)
github gpiozero / gpiozero / gpiozero / boards.py View on Github external
Number of times to blink; :data:`None` (the default) means forever.

        :param bool background:
            If :data:`True`, start a background thread to continue blinking and
            return immediately. If :data:`False`, only return when the blink is
            finished (warning: the default value of *n* will result in this
            method never returning).
        """
        for led in self.leds:
            if isinstance(led, LED):
                if fade_in_time:
                    raise ValueError('fade_in_time must be 0 with non-PWM LEDs')
                if fade_out_time:
                    raise ValueError('fade_out_time must be 0 with non-PWM LEDs')
        self._stop_blink()
        self._blink_thread = GPIOThread(
            target=self._blink_device,
            args=(on_time, off_time, fade_in_time, fade_out_time, n))
        self._blink_thread.start()
        if not background:
            self._blink_thread.join()
            self._blink_thread = None
github gpiozero / gpiozero / gpiozero / output_devices.py View on Github external
:param float fade_out_time:
            Number of seconds to spend fading out. Defaults to 0.

        :type n: int or None
        :param n:
            Number of times to blink; :data:`None` (the default) means forever.

        :param bool background:
            If :data:`True` (the default), start a background thread to
            continue blinking and return immediately. If :data:`False`, only
            return when the blink is finished (warning: the default value of
            *n* will result in this method never returning).
        """
        self._stop_blink()
        self._blink_thread = GPIOThread(
            target=self._blink_device,
            args=(on_time, off_time, fade_in_time, fade_out_time, n)
        )
        self._blink_thread.start()
        if not background:
            self._blink_thread.join()
            self._blink_thread = None