Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
else:
self.new_sensor_reading(self._device.read_value())
def read_sensor(self):
"""
Abstract method that must be implementd if no device is set.
This method is called by the sensor thread on regular intervals.
There is no need to implement own polling systems or call time.sleep this
is handles by the calling sensor thread.
Use the method new_sensor_reading to store the value.
"""
pass
class SensorThread(KerviThread):
r"""
SensorThread is the base class that polls sensors.
Add one or more sensors and set polling interval.
:param sensors:
Id of the sensor.
This id is used in other components to reference this sesnor.
:type sensors: ``str``
:param reading_interval:
Polling interval in seconds between
:type reading_interval: ``float``
"""
def __init__(self, sensors, reading_interval=1):
def _start_command(self):
if not self.alive:
self.alive = True
KerviThread.start(self)
def set_queue_handler(self, handler):
self.queue_handler = handler
super(KerviThread, self).start()
def _bit2(self, src, bit, val):
bit = 1 << bit
return (src | bit) if val else (src & ~bit)
def _validate_channel(self, channel):
# Raise an exception if pin is outside the range of allowed values.
if channel < 0 or channel >= self.num_gpio:
raise DeviceChannelOutOfBoundsError(self.device_name, channel)
class OneWireSensorDeviceDriver(SensorDeviceDriver):
def __init__(self, address):
self.one_wire = get_one_wire(address)
class ChannelPollingThread(KerviThread):
def __init__(self, channel, device, callback, polling_time=.1):
KerviThread.__init__(self)
self._callback = callback
self._channel = channel
self._device = device
self._value = None
self._polling_time = polling_time
self.alive = False
self.spine = Spine()
if self.spine:
self.spine.register_command_handler("startThreads", self._start_command)
self.spine.register_command_handler("stopThreads", self._stop_command)
def _step(self):
"""Private method do not call it directly or override it."""
try:
@property
def num_gpio(self):
return 0
def _bit2(self, src, bit, val):
bit = 1 << bit
return (src | bit) if val else (src & ~bit)
def _validate_channel(self, channel):
# Raise an exception if pin is outside the range of allowed values.
if channel < 0 or channel >= self.num_gpio:
raise DeviceChannelOutOfBoundsError(self.device_name, channel)
class ChannelPollingThread(KerviThread):
def __init__(self, channel, device, callback, polling_time=.1):
KerviThread.__init__(self)
self._callback = callback
self._channel = channel
self._device = device
self._value = None
self._polling_time = polling_time
self.alive = False
self.spine = Spine()
if self.spine:
self.spine.register_command_handler("startThreads", self._start_command)
self.spine.register_command_handler("stopThreads", self._stop_command)
def _step(self):
"""Private method do not call it directly or override it."""
try:
# Copyright (c) 2016, Tim Wentzlau
# Licensed under MIT
""" Module that holds a general kervi application module threading class"""
from kervi.utility.thread import KerviThread
from kervi.spine import Spine
class ModuleThread(KerviThread):
def __init__(self):
KerviThread.__init__(self)
self.spine = Spine()
self.spine.register_command_handler("startThreads", self._startCommand)
def _step(self):
self.moduleStep()
def _startCommand(self):
if not self.isAlive():
super(KerviThread, self).start()
def _stopCommand(self):
self.stop()
self.query = query
self.args = args
self.result = []
self.injected = injected
self.scope = scope
self.session = session
import traceback
#spine.log.debug("qtx:{0}, handler:{1}", self.query, self.handler)
#for line in traceback.format_stack():
# spine.log.debug(line.strip())
def run(self):
self.result = self.handler(self.query, self.args, injected=self.injected, scope=self.scope, session=self.session)
class _CQRSQueue(KerviThread):
def __init__(self, name):
KerviThread.__init__(self)
self.queues = [Queue.Queue(), Queue.Queue(), Queue.Queue()]
self.name = name
def get_info(self):
queueSize = []
for queue in self.queues:
queueSize += [len(queue)]
info = {"queueSize":queueSize}
return info
def set_queue_handler(self, handler):
self.queue_handler = handler
super(KerviThread, self).start()
def _start_command(self):
if not self.alive:
self.alive = True
KerviThread.start(self)
New value to be stored in the system.
:type value: ``float``
"""
if self._dimensions > 1:
for dimension in range(0, self._dimensions):
value = sensor_value[dimension]
self._sub_sensors[dimension]._new_sensor_reading(value)
else:
self._set_value(sensor_value)
def _read_sensor(self):
self._new_sensor_reading(self._device.read_value())
class _SensorThread(KerviThread):
r"""
SensorThread is the base class that polls sensors.
Add one or more sensors and set polling interval.
:param sensors:
Id of the sensor.
This id is used in other components to reference this sesnor.
:type sensors: ``str``
:param reading_interval:
Polling interval in seconds between
:type reading_interval: ``float``
"""
def __init__(self, sensors, reading_interval=1):
def _startCommand(self):
if not self.isAlive():
super(KerviThread, self).start()