Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_number_set_value():
spine = MockupSpine()
number = NumberValue("Dynamic Number", value_id="dn", spine=spine)
number.value = 10
assert number.value == 10
def input(input_id, name, value_class=NumberValue):
"""Add input to controller"""
def _init():
return value_class(
name,
input_id=input_id,
is_input=True,
index=-1
)
def _decorator(cls):
setattr(cls, input_id, _init())
return cls
return _decorator
def __init__(self, gpio_device, channel):
BooleanValue.__init__(self, gpio_device.name +" " + str(channel), input_id=str(channel))
self._device = gpio_device
self._channel = channel
self.pwm = ValueList(self, True)
self.pwm.add("duty_cycle", "Duty cycle", NumberValue)
self.pwm.add("frequency", "Frequency", NumberValue)
self.pwm.add("active", "Active", BooleanValue)
self.pwm["duty_cycle"].min = 0
self.pwm["duty_cycle"].max = 100
self.pwm["frequency"].min = 0
self.pwm["frequency"].max = 100000
def add_number(self, number_id, name=None, min=None, max=None, unit=None, value=None, persist_value=False):
from kervi.values import NumberValue
number_name = name
if not number_name:
number_name = number_id
number = self.add(number_id, number_name, NumberValue)
if unit:
number.unit = unit
if min:
number.min = min
if max:
number.max = max
if value:
number.value = value
if persist_value:
number.persist_value = persist_value
return number
else:
self.spine.log.warning("no keyboards found")
if listen_to_mouse:
if len(self._devices.mice):
self._mouse_thread = _InputThread(self, self._devices.mice[0])
else:
self.spine.log.warning("no mouse found")
if listen_to_gamepad:
if len(self._devices.gamepads):
self._gamepad_thread = _InputThread(self, self._devices.gamepads[0])
else:
self.spine.log.warning("no gamepads found")
self.key = self.outputs.add("key", "Key", StringValue)
self.mouse_x = self.outputs.add("mouse_x", "Mouse x", NumberValue)
self.mouse_y = self.outputs.add("mouse_y", "Mouse y", NumberValue)
self.mouse_wheel = self.outputs.add("mouse_wheel", "Mouse wheel", NumberValue)
self._key_map = {}
self._ctrl_keys = ["KEY_LEFTCTRL", "KEY_LEFTMETA", "KEY_LEFTSHIFT", "KEY_RIGHTCTRL", "KEY_RIGNTMETA", "KEY_RIGHTSHIFT"]
def __init__(self, controller_id="steering", name="Steering"):
TaskHandler.__init__(self, controller_id, name)
self.type = "steering"
self.left_speed = self.outputs.add("left_speed", "Left speed", NumberValue)
self.right_speed = self.outputs.add("right_speed", "Right speed", NumberValue)
self.inputs.add("left_encoder", "Left encoder", NumberValue)
self.inputs.add("right_encoder", "Right encoder", NumberValue)
self._adjust = 0
self.speed = self.inputs.add("speed", "Speed", NumberValue)
self.adaptive_speed = self.inputs.add("adaptive_speed", "Adaptive speed", NumberValue)
self.direction = self.inputs.add("direction", "Direction", NumberValue)
self.adaptive_direction = self.inputs.add("Adaptive direction", "Adaptive direction", NumberValue)
def kervi_value_changed(self, changed_input, value):
if changed_input == self.pwm["duty_cycle"]:
if self.pwm["active"].value:
self._device.pwm_start(self._channel, duty_cycle=changed_input.value)
if changed_input == self.pwm["frequency"]:
self._device.pwm_start(self._channel, frequency=changed_input.value)
if changed_input == self.pwm["active"]:
if changed_input.value:
self._device.pwm_start(self._channel)
else:
self._device.pwm_stop(self._channel)
class AnalogIOChannel(NumberValue):
def __init__(self, gpio_device, channel, is_input):
NumberValue.__init__(self, str(channel), is_input=is_input)
self._device = gpio_device
self._channel = channel
self.min = 0
self.max = 100
def get(self):
return self._device.get(self, self._channel)
def set(self, value):
self._device.set(self._channel, value)
def define_as_input(self, pullup=False):
self.is_input = False
self._device.define_as_input(self._channel, pullup)
def __init__(self, gpio_device, channel, is_input):
NumberValue.__init__(self, str(channel), is_input=is_input)
self._device = gpio_device
self._channel = channel
self.min = 0
self.max = 100
def __init__(self, sensor_id, name, device=None, **kwargs):
Controller.__init__(self, sensor_id, name, **kwargs)
self._device = device
self._sub_sensors = []
self._dimensions = 1
self._index = kwargs.pop("index", -1)
self._enabled = None
if device.value_type == "color":
self._sensor_value = self.outputs.add("value", name, ColorValue)
elif device.value_type == "number":
self._sensor_value = self.outputs.add(sensor_id, name, NumberValue)
else:
raise ValueError("Can not handle device value type: " + str(device.value_type))
if self._device:
self.value_type = self._device.type
self.value_unit = self._device.unit
self._sensor_value.type = self._device.type
self._sensor_value.unit = self._device.unit
self._sensor_value.min = self._device.min
self._sensor_value.max = self._device.max
if self._index == -1:
self._dimensions = self._device.dimensions
self._dimension_labels = self._device.dimension_labels
if self._dimensions > 1:
count = 0
def __init__(self, stream_id, region_group, observer_id, handler=None, name="stream", observer_type="stream_observer"):
Controller.__init__(self, observer_id, name)
self.type = observer_type
self.observer_id = observer_id
self.stream_id = stream_id
self.region_group = region_group
self._epc_start_time = time.time()
self._epc_counter = 0
self._handler = handler
self.streamed_eps = self.outputs.add("source_eps", "Events per second", NumberValue)
self.spine.register_event_handler("visionRegionChange",self._on_event, stream_id)