Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_signal_connection(qapp, qtbot):
# Create a signal and attach our listener
sig = Signal(name='my_signal', value=1)
register_signal(sig)
widget = PyDMLineEdit()
qtbot.addWidget(widget)
widget.channel = 'sig://my_signal'
listener = widget.channels()[0]
# If PyDMChannel can not connect, we need to connect it ourselves
# In PyDM > 1.5.0 this will not be neccesary as the widget will be
# connected after we set the channel name
if not hasattr(listener, 'connect'):
pydm.utilities.establish_widget_connections(widget)
# Check that our widget receives the initial value
qapp.processEvents()
assert widget._write_access
assert widget._connected
assert widget.value == 1
# Check that we can push values back to the signal which in turn causes the
def test_array_signal_send_value(qapp, qtbot):
sig = Signal(name='my_array', value=np.ones(4))
register_signal(sig)
widget = PyDMLineEdit()
qtbot.addWidget(widget)
widget.channel = 'sig://my_array'
qapp.processEvents()
assert all(widget.value == np.ones(4))
timeout=10.0)
print('epicsmotor', m)
m.wait_for_connection()
return m
class CustomAlarmEpicsSignalRO(EpicsSignalRO):
alarm_status = AlarmStatus.NO_ALARM
alarm_severity = AlarmSeverity.NO_ALARM
class TestEpicsMotor(EpicsMotor):
user_readback = C(CustomAlarmEpicsSignalRO, '.RBV')
high_limit_switch = C(Signal, value=0)
low_limit_switch = C(Signal, value=0)
direction_of_travel = C(Signal, value=0)
high_limit_value = C(EpicsSignalRO, '.HLM')
low_limit_value = C(EpicsSignalRO, '.LLM')
def test_timeout(motor):
assert motor.timeout == 10.0
motor.timeout = 20.0
assert motor.timeout == 20.0
def test_connected(motor):
assert motor.connected
def test_limits(motor):
device_limits = (motor.low_limit_value.get(), motor.high_limit_value.get())
def sig():
name = 'test{}'.format(random.randint(0, 10000))
sig = ophyd.Signal(name=name)
yield sig
sig.destroy()
def quadem():
class FakeStats(StatsPlugin):
plugin_type = Cpt(Signal, value=StatsPlugin._plugin_type)
nd_array_port = Cpt(Signal, value='NSLS_EM')
class FakeImage(ImagePlugin):
plugin_type = Cpt(Signal, value=ImagePlugin._plugin_type)
nd_array_port = Cpt(Signal, value='NSLS_EM')
class FakeQuadEM(QuadEM):
image = Cpt(FakeImage, 'image1:')
current1 = Cpt(FakeStats, 'Current1:')
current2 = Cpt(FakeStats, 'Current2:')
current3 = Cpt(FakeStats, 'Current3:')
current4 = Cpt(FakeStats, 'Current4:')
em = FakeQuadEM('quadem:', name='quadem')
''' Beware: Ugly Hack below
from textwrap import dedent
import caproto
from caproto.server import PVGroup, ioc_arg_parser, pvproperty, run
import ophyd
from ophyd import Component as Cpt
from ophyd import EpicsSignal
from pcdsdevices.variety import set_metadata
class Variants(ophyd.Device):
soft_delta = Cpt(ophyd.Signal, value=1)
tweakable_delta_source_by_name = Cpt(EpicsSignal, 'tweakable')
set_metadata(
tweakable_delta_source_by_name,
{'variety': 'scalar-tweakable',
'delta.signal': 'soft_delta',
'delta.source': 'signal',
'range.source': 'value',
'range.value': [-10, 10],
}
)
tweakable_delta_source_by_component = Cpt(EpicsSignal, 'tweakable')
set_metadata(
tweakable_delta_source_by_component,
{'variety': 'scalar-tweakable',
motor_record = config.motor_recs[0]
val = record_field(motor_record, 'VAL')
rbv = record_field(motor_record, 'RBV')
rw_signal = EpicsSignal(rbv, write_pv=val)
rw_signal.subscribe(callback, event_type=rw_signal.SUB_VALUE)
rw_signal.subscribe(callback, event_type=rw_signal.SUB_SETPOINT)
rw_signal.value = 2
time.sleep(1.)
rw_signal.value = 1
time.sleep(1.)
# You can also create a Python Signal:
sig = Signal(name='testing', value=10)
logger.info('Python signal: %s' % sig)
# Even one with a separate setpoint/readback value:
sig = Signal(name='testing', value=10, setpoint=2,
separate_readback=True)
logger.info('Python signal: %s' % sig)
SingleTrigger, ADBase)
from .status import DeviceStatus
def _current_fields(attr_base, field_base, range_, **kwargs):
defn = OrderedDict()
for i in range_:
attr = '{attr}{i}'.format(attr=attr_base, i=i)
suffix = '{field}{i}'.format(field=field_base, i=i)
defn[attr] = (EpicsSignal, suffix, kwargs)
return defn
class QuadEMPort(ADBase):
port_name = Cpt(Signal, value='')
def __init__(self, port_name, *args, **kwargs):
super().__init__(*args, **kwargs)
self.port_name.put(port_name)
class QuadEM(SingleTrigger, DetectorBase):
# These settings intentionally shadow the settings inherited from
# DetectorBase.
_default_read_attrs = None
_default_configuration_attrs = None
_status_type = DeviceStatus # overrriding the default in SingleTrigger
# This is needed because ophyd verifies that it can see all
# of the nodes in the asyn pipeline, however these IOCs do not
def suggest_composite_screen(cls, device_cls):
"""
Suggest to use the composite screen for the given class.
Returns
-------
composite : bool
If True, favor the composite screen.
"""
num_devices = 0
num_signals = 0
for attr, component in utils._get_top_level_components(device_cls):
num_devices += issubclass(component.cls, ophyd.Device)
num_signals += issubclass(component.cls, ophyd.Signal)
specific_screens = cls._get_specific_screens(device_cls)
if (len(specific_screens) or
(num_devices <= cls.device_count_threshold and
num_signals >= cls.signal_count_threshold)):
# 1. There's a custom screen - we probably should use them
# 2. There aren't many devices, so the composite display isn't
# useful
# 3. There are many signals, which should be broken up somehow
composite = False
else:
# 1. No custom screen, or
# 2. Many devices or a relatively small number of signals
composite = True
logger.debug(