Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
class _PageTimer(threading.Thread):
def __init__(self, display):
threading.Thread.__init__(self, name="pageTimer")
self.deamon = True
self._display = display
self._terminated = False
def stop(self):
self._terminated = True
def run(self):
while not self._terminated:
self._display.activate_page("next")
time.sleep(self._display.page_change_speed)
class Display(Controller):
r"""
The display class manage a display device.
:param display_id:
Id of the display. This id is never displayed it is used to reference the display in code.
:type display_id: ``str``
:param name:
Name of the display.
:type name: ``str``
:param device:
The display device that should be used. Could be one of the displays from the kervi device library
or a display device driver that inherits from kervi.hal.DisplayDeviceDriver
def __init__(self, input_id="user_input", name="User input", device=None, **kwargs):
Controller.__init__(self, input_id, name)
listen_to_keyboard = kwargs.pop("listen_to_keyboard", False)
listen_to_mouse = kwargs.pop("listen_to_mouse", False)
listen_to_gamepad = kwargs.pop("listen_to_gamepad", True)
self._devices = hal.get_user_inputs()
self._keyboard_thread = None
self._mouse_thread = None
self._gamepad_thread = None
if listen_to_keyboard:
if len(self._devices.keyboards):
self._keyboard_thread = _InputThread(self, self._devices.keyboards[0])
else:
self.spine.log.warning("no keyboards found")
if listen_to_mouse:
if len(self._devices.mice):
self._mouse_thread = _InputThread(self, self._devices.mice[0])
else:
#Copyright 2017 Tim Wentlau.
#Distributed under the MIT License. See LICENSE in root of project.
import time
from kervi.controllers import Controller
from kervi.values import NumberValue, BooleanValue
from kervi.actions import action
from kervi.controllers import Controller
class MotorSteering(Controller):
"""
Control the speed and direction of two motors.
"""
def __init__(self, controller_id="steering", name="Steering"):
Controller.__init__(self, controller_id, name)
self.type = "steering"
self.left_speed = self.outputs.add("left_speed", "Left speed", NumberValue)
self.right_speed = self.outputs.add("right_speed", "Right speed", NumberValue)
self.inputs.add("left_encoder", "Left encoder", NumberValue)
self.inputs.add("right_encoder", "Right encoder", NumberValue)
self._adjust = 0
self.speed = self.inputs.add("speed", "Speed", NumberValue)
self.adaptive_speed = self.inputs.add("adaptive_speed", "Adaptive speed", NumberValue)
import time
from kervi.controllers import Controller
from kervi.values import NumberValue
class RegionObserver(Controller):
def __init__(self, stream_id, region_group, observer_id, handler=None, name="stream", observer_type="stream_observer"):
Controller.__init__(self, observer_id, name)
self.type = observer_type
self.observer_id = observer_id
self.stream_id = stream_id
self.region_group = region_group
self._epc_start_time = time.time()
self._epc_counter = 0
self._handler = handler
self.streamed_eps = self.outputs.add("source_eps", "Events per second", NumberValue)
self.spine.register_event_handler("visionRegionChange",self._on_event, stream_id)
def __call__(self, *args, **kwargs):
return self._handler(*args, **kwargs)
self.value_id = value_id
self.x= x
self.y = y
self.size = size
self.value = ""
self.unit = ""
def _set_value(self, value, unit):
if self.transform:
self.value = self.transform(value)
else:
self.value = value
self.unit = unit
class DisplayPage(Controller):
def __init__(self, page_id, name = None):
Controller.__init__(self, page_id, name)
self.spine.register_event_handler("valueChanged", self._link_changed_event)
self._template = ""
self._links = {}
self._displays = []
self._text = None
self._lines = 1
def _link_changed_event(self, source_id, value, x):
#print("lc", source_id, value)
if source_id in self._links.keys():
link = self._links[source_id]
link._set_value(value["display_value"], value["display_unit"])
self._render()
import kervi.spine as spine
import kervi.hal as hal
from kervi.core.authentication import Authorization
from kervi.actions import action
try:
from SimpleHTTPServer import SimpleHTTPRequestHandler
except:
from http.server import SimpleHTTPRequestHandler
try:
from BaseHTTPServer import HTTPServer
except:
from http.server import HTTPServer
class CameraBase(Controller):
def __init__(self, camera_id, name, **kwargs):
Controller.__init__(self, camera_id, name)
self.type = "camera"
self.media_config = Configuration.media
self.inputs.add("pan", "Pan", NumberValue)
self.inputs.add("tilt", "Tilt", NumberValue)
self.pan = self.outputs.add("pan", "Pan", NumberValue)
self.tilt = self.outputs.add("tilt", "Tilt", NumberValue)
self.flip_vertical = kwargs.get("flip_vertical", False)
self.flip_horizontal = kwargs.get("flip_horizontal", False)
self.inputs.add("fps", "FPS", EnumValue)
self.inputs["fps"].set_ui_parameter("inline", True)
def __init__(self, sensor_id, name, device=None, **kwargs):
Controller.__init__(self, sensor_id, name, **kwargs)
self._device = device
self._sub_sensors = []
self._dimensions = 1
self._index = kwargs.pop("index", -1)
self._enabled = None
if device.value_type == "color":
self._sensor_value = self.outputs.add("value", name, ColorValue)
elif device.value_type == "number":
self._sensor_value = self.outputs.add(sensor_id, name, NumberValue)
else:
raise ValueError("Can not handle device value type: " + str(device.value_type))
if self._device:
self.value_type = self._device.type
self.value_unit = self._device.unit
:param section_id:
id of the section.
:type section_id: str
:param \**kwargs:
Use the kwargs below to override default values set in ui_parameters
:Keyword Arguments:
* *ui_size* (``int``) -- Size of the component in dashboard unit size.
In order to make the sections and components align correct a dashboard unit is defined.
Default the dashboard unit is a square that is 150 x 150 pixels.
The width of the camera picture is ui_size * dashboard unit size.
"""
Controller.link_to_dashboard(
self,
dashboard_id,
panel_id,
**kwargs
)
class _MotorNumOutOfBoundsError(Exception):
def __init__(self, device_name, motor):
super(_MotorNumOutOfBoundsError, self).__init__(
'{0} Exception: Motor num out of Bounds, motor={1}'.format(device_name, motor)
)
class DCMotor(object):
def __init__(self, motor):
self._speed = motor
@property
def speed(self):
return self._speed
class DCMotorControllerBase(Controller):
def __init__(self, controller_id, device_name, num_motors):
Controller.__init__(self, controller_id, device_name + "-DC motors")
self._num_motors = num_motors
self._device_name = device_name
for motor in range(0, num_motors):
self.inputs.add("motor_" + str(motor), "Motor " + str(motor), NumberValue)
def __getitem__(self, motor):
return DCMotor(self.inputs["motor_" + str(motor)])
def _validate_motor(self, motor):
if motor < 0 or motor > self._num_motors:
raise _MotorNumOutOfBoundsError(self._device_name, motor)
@property