Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_set_method():
sig = Signal()
st = sig.set(28)
wait(st)
assert st.done
assert st.success
assert sig.get() == 28
def test_signal_copy():
start_t = time.time()
name = 'test'
value = 10.0
signal = Signal(name=name, value=value, timestamp=start_t)
sig_copy = copy.copy(signal)
assert signal.name == sig_copy.name
assert signal.value == sig_copy.value
assert signal.get() == sig_copy.get()
assert signal.timestamp == sig_copy.timestamp
signal.subscribe(_sub_test, run=False)
signal.clear_sub(_sub_test, event_type=signal.SUB_VALUE)
kw = info['kw']
assert 'value' in kw
assert 'timestamp' in kw
assert 'old_value' in kw
assert kw['value'] == value
assert kw['old_value'] == value
assert kw['timestamp'] == signal.timestamp
# readback callback for soft signal
info = dict(called=False)
signal.subscribe(_sub_test, event_type=Signal.SUB_VALUE,
run=False)
assert not info['called']
signal.put(value + 1)
assert info['called']
signal.clear_sub(_sub_test)
kw = info['kw']
assert 'value' in kw
assert 'timestamp' in kw
assert 'old_value' in kw
assert kw['value'] == value + 1
assert kw['old_value'] == value
assert kw['timestamp'] == signal.timestamp
simulated_ro = SynSignalRO(func=random.random, name='simul_ro')
standard.sim_put(1)
read_and_write.sim_put(2)
read_only.sim_put(3)
simulated.put(4)
signals = {
# Signal is its own write
'Standard': standard,
# Signal has separate write/read
'Read and Write': read_and_write,
'Read Only': read_only,
'Simulated': simulated,
'SimulatedRO': simulated_ro,
'Array': Signal(name='array', value=np.ones((5, 10)))
}
for name, signal in signals.items():
panel.add_signal(signal, name=name)
wait_panel(
qtbot, panel,
signal_names=set(sig.name for sig in signals.values())
)
def widget_at(row, col):
return panel.itemAtPosition(row, col).widget()
# Check read-only channels do not have write widgets
assert widget_at(2, 1) is widget_at(2, 2)
assert widget_at(4, 1) is widget_at(4, 2)
import logging
import unittest
import pytest
import numpy as np
from ophyd import (Device, Component, FormattedComponent)
from ophyd.signal import (Signal, AttributeSignal, ArrayAttributeSignal)
from ophyd.utils import ExceptionBundle
logger = logging.getLogger(__name__)
class FakeSignal(Signal):
def __init__(self, read_pv, *, name=None, parent=None):
self.read_pv = read_pv
super().__init__(name=name, parent=parent)
def get(self):
return self.name
def describe_configuration(self):
return {self.name + '_conf': {'source': 'SIM:test'}}
def read_configuration(self):
return {self.name + '_conf': {'value': 0}}
def setUpModule():
pass
See: https://github.com/NSLS-II-CSX/timestamp
Parameters
----------
data_is_time : bool, optional
Use time as the data being acquired
'''
_default_configuration_attrs = ()
_default_read_attrs = ()
select = Cpt(EpicsSignal, "Sw-Sel")
reset = Cpt(EpicsSignal, "Rst-Sel")
waveform_count = Cpt(EpicsSignalRO, "Val:TimeN-I")
waveform = Cpt(EpicsSignalRO, "Val:Time-Wfrm")
waveform_nord = Cpt(EpicsSignalRO, "Val:Time-Wfrm.NORD")
data_is_time = Cpt(Signal)
def __init__(self, *args,
data_is_time=True, stream_name=None,
**kwargs):
self.stream_name = stream_name
super().__init__(*args, **kwargs)
self.data_is_time.put(data_is_time)
def _get_waveform(self):
if self.waveform_count.get():
return self.waveform.get(count=int(self.waveform_nord.get()))
else:
return []
res[k]['timestamp'] = 0
return res
def __repr__(self):
return ""
class SPseudo3x3(PseudoPositioner):
pseudo1 = C(PseudoSingle, limits=(-10, 10), egu='a', kind=Kind.hinted)
pseudo2 = C(PseudoSingle, limits=(-10, 10), egu='b', kind=Kind.hinted)
pseudo3 = C(PseudoSingle, limits=None, egu='c', kind=Kind.hinted)
real1 = C(SoftPositioner, init_pos=0)
real2 = C(SoftPositioner, init_pos=0)
real3 = C(SoftPositioner, init_pos=0)
sig = C(Signal, value=0)
@pseudo_position_argument
def forward(self, pseudo_pos):
pseudo_pos = self.PseudoPosition(*pseudo_pos)
# logger.debug('forward %s', pseudo_pos)
return self.RealPosition(real1=-pseudo_pos.pseudo1,
real2=-pseudo_pos.pseudo2,
real3=-pseudo_pos.pseudo3)
@real_position_argument
def inverse(self, real_pos):
real_pos = self.RealPosition(*real_pos)
# logger.debug('inverse %s', real_pos)
return self.PseudoPosition(pseudo1=-real_pos.real1,
pseudo2=-real_pos.real2,
pseudo3=-real_pos.real3)
# area detector that directly stores image data in Event
direct_img = SynSignal(func=lambda: np.array(np.ones((10, 10))),
name='img', labels={'detectors'})
# area detector that stores data in a file
img = SynSignalWithRegistry(func=lambda: np.array(np.ones((10, 10))),
name='img', labels={'detectors'})
invariant1 = InvariantSignal(func=lambda: 0, name='invariant1',
labels={'detectors'})
invariant2 = InvariantSignal(func=lambda: 0, name='invariant2',
labels={'detectors'})
det_with_conf = DetWithConf(name='det', labels={'detectors'})
det_with_count_time = DetWithCountTime(name='det', labels={'detectors'})
rand = SynPeriodicSignal(name='rand', labels={'detectors'})
rand2 = SynPeriodicSignal(name='rand2', labels={'detectors'})
motor_no_pos = SynAxisNoPosition(name='motor', labels={'motors'})
bool_sig = Signal(value=False, name='bool_sig', labels={'detectors'})
motor_no_hints1 = SynAxisNoHints(name='motor1', labels={'motors'})
motor_no_hints2 = SynAxisNoHints(name='motor2', labels={'motors'})
# Because some of these reference one another we must define them (above)
# before we pack them into a namespace (below).
return SimpleNamespace(
motor=motor,
motor1=motor1,
motor2=motor2,
motor3=motor3,
jittery_motor1=jittery_motor1,
jittery_motor2=jittery_motor2,
noisy_det=noisy_det,
det=det,
identical_det=identical_det,
"Return a readable but unique id like 'label-fjfi5a'"
if label:
return '-'.join([label, new_uid()[:truncate]])
else:
return new_uid()[:truncate]
class NullStatus(StatusBase):
"A simple Status object that is always immediately done, successfully."
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._finished(success=True)
class SynSignal(Signal):
"""
A synthetic Signal that evaluates a Python function when triggered.
Parameters
----------
func : callable, optional
This function sets the signal to a new value when it is triggered.
Expected signature: ``f() -> value``.
By default, triggering the signal does not change the value.
name : string, keyword only
exposure_time : number, optional
Seconds of delay when triggered (simulated 'exposure time'). Default is
0.
precision : integer, optional
Digits of precision. Default is 3.
parent : Device, optional
from ..app import launch_from_devices
from ..suite import TyphosSuite
from ..utils import nullcontext
# Define matrix of testing parameters
Shape = namedtuple('Shape', ['num_signals', 'subdevice_layers',
'subdevice_spread'])
# total_signals == num_signals * (subdevice_spread ** subdevice_layers)
SHAPES = dict(flat=Shape(100, 1, 1),
deep=Shape(100, 100, 1),
wide=Shape(1, 1, 100),
cube=Shape(4, 2, 5))
Test = namedtuple('Test', ['signal_class', 'include_prefix', 'start_ioc'])
TESTS = dict(soft=Test(Signal, False, False),
connect=Test(EpicsSignal, True, True),
noconnect=Test(EpicsSignal, True, False))
def profiler_benchmark(cls, start_ioc, auto_exit=True):
"""
Catch-all for simple profiler benchmarks.
This handles the case where we want to do interactive diagnosis with the
profiler and launch a screen.
"""
prefix = random_prefix()
with benchmark_context(start_ioc, cls, prefix):
return launch_from_devices([cls(prefix, name='test')],
auto_exit=auto_exit)