How to use the ophyd.Signal function in ophyd

To help you get started, we’ve selected a few ophyd examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github pcdshub / typhon / tests / plugins / test_core.py View on Github external
def test_signal_connection(qapp, qtbot):
    # Create a signal and attach our listener
    sig = Signal(name='my_signal', value=1)
    register_signal(sig)
    widget = PyDMLineEdit()
    qtbot.addWidget(widget)
    widget.channel = 'sig://my_signal'
    listener = widget.channels()[0]
    # If PyDMChannel can not connect, we need to connect it ourselves
    # In PyDM > 1.5.0 this will not be neccesary as the widget will be
    # connected after we set the channel name
    if not hasattr(listener, 'connect'):
        pydm.utilities.establish_widget_connections(widget)
    # Check that our widget receives the initial value
    qapp.processEvents()
    assert widget._write_access
    assert widget._connected
    assert widget.value == 1
    # Check that we can push values back to the signal which in turn causes the
github pcdshub / typhon / tests / plugins / test_core.py View on Github external
def test_array_signal_send_value(qapp, qtbot):
    sig = Signal(name='my_array', value=np.ones(4))
    register_signal(sig)
    widget = PyDMLineEdit()
    qtbot.addWidget(widget)
    widget.channel = 'sig://my_array'
    qapp.processEvents()
    assert all(widget.value == np.ones(4))
github bluesky / ophyd / tests / test_epicsmotor.py View on Github external
timeout=10.0)
    print('epicsmotor', m)
    m.wait_for_connection()
    return m


class CustomAlarmEpicsSignalRO(EpicsSignalRO):
    alarm_status = AlarmStatus.NO_ALARM
    alarm_severity = AlarmSeverity.NO_ALARM


class TestEpicsMotor(EpicsMotor):
    user_readback = C(CustomAlarmEpicsSignalRO, '.RBV')
    high_limit_switch = C(Signal, value=0)
    low_limit_switch = C(Signal, value=0)
    direction_of_travel = C(Signal, value=0)
    high_limit_value = C(EpicsSignalRO, '.HLM')
    low_limit_value = C(EpicsSignalRO, '.LLM')


def test_timeout(motor):
    assert motor.timeout == 10.0
    motor.timeout = 20.0
    assert motor.timeout == 20.0


def test_connected(motor):
    assert motor.connected


def test_limits(motor):
    device_limits = (motor.low_limit_value.get(), motor.high_limit_value.get())
github pcdshub / typhon / tests / test_cache.py View on Github external
def sig():
    name = 'test{}'.format(random.randint(0, 10000))
    sig = ophyd.Signal(name=name)
    yield sig
    sig.destroy()
github bluesky / ophyd / tests / test_quadem.py View on Github external
def quadem():
    class FakeStats(StatsPlugin):
        plugin_type = Cpt(Signal, value=StatsPlugin._plugin_type)
        nd_array_port = Cpt(Signal, value='NSLS_EM')

    class FakeImage(ImagePlugin):
        plugin_type = Cpt(Signal, value=ImagePlugin._plugin_type)
        nd_array_port = Cpt(Signal, value='NSLS_EM')

    class FakeQuadEM(QuadEM):
        image = Cpt(FakeImage, 'image1:')
        current1 = Cpt(FakeStats, 'Current1:')
        current2 = Cpt(FakeStats, 'Current2:')
        current3 = Cpt(FakeStats, 'Current3:')
        current4 = Cpt(FakeStats, 'Current4:')

    em = FakeQuadEM('quadem:', name='quadem')

    ''' Beware: Ugly Hack below
github pcdshub / typhon / tests / variety_ioc.py View on Github external
from textwrap import dedent

import caproto
from caproto.server import PVGroup, ioc_arg_parser, pvproperty, run

import ophyd
from ophyd import Component as Cpt
from ophyd import EpicsSignal
from pcdsdevices.variety import set_metadata


class Variants(ophyd.Device):
    soft_delta = Cpt(ophyd.Signal, value=1)

    tweakable_delta_source_by_name = Cpt(EpicsSignal, 'tweakable')
    set_metadata(
        tweakable_delta_source_by_name,
        {'variety': 'scalar-tweakable',
         'delta.signal': 'soft_delta',
         'delta.source': 'signal',
         'range.source': 'value',
         'range.value': [-10, 10],
         }
    )

    tweakable_delta_source_by_component = Cpt(EpicsSignal, 'tweakable')
    set_metadata(
        tweakable_delta_source_by_component,
        {'variety': 'scalar-tweakable',
github bluesky / ophyd / examples / signal.py View on Github external
motor_record = config.motor_recs[0]
    val = record_field(motor_record, 'VAL')
    rbv = record_field(motor_record, 'RBV')

    rw_signal = EpicsSignal(rbv, write_pv=val)
    rw_signal.subscribe(callback, event_type=rw_signal.SUB_VALUE)
    rw_signal.subscribe(callback, event_type=rw_signal.SUB_SETPOINT)

    rw_signal.value = 2
    time.sleep(1.)
    rw_signal.value = 1
    time.sleep(1.)

    # You can also create a Python Signal:
    sig = Signal(name='testing', value=10)
    logger.info('Python signal: %s' % sig)

    # Even one with a separate setpoint/readback value:
    sig = Signal(name='testing', value=10, setpoint=2,
                 separate_readback=True)
    logger.info('Python signal: %s' % sig)
github bluesky / ophyd / ophyd / quadem.py View on Github external
SingleTrigger, ADBase)
from .status import DeviceStatus


def _current_fields(attr_base, field_base, range_, **kwargs):
    defn = OrderedDict()
    for i in range_:
        attr = '{attr}{i}'.format(attr=attr_base, i=i)
        suffix = '{field}{i}'.format(field=field_base, i=i)
        defn[attr] = (EpicsSignal, suffix, kwargs)

    return defn


class QuadEMPort(ADBase):
    port_name = Cpt(Signal, value='')

    def __init__(self, port_name, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.port_name.put(port_name)


class QuadEM(SingleTrigger, DetectorBase):
    # These settings intentionally shadow the settings inherited from
    # DetectorBase.
    _default_read_attrs = None
    _default_configuration_attrs = None

    _status_type = DeviceStatus  # overrriding the default in SingleTrigger

    # This is needed because ophyd verifies that it can see all
    # of the nodes in the asyn pipeline, however these IOCs do not
github pcdshub / typhon / typhos / display.py View on Github external
def suggest_composite_screen(cls, device_cls):
        """
        Suggest to use the composite screen for the given class.

        Returns
        -------
        composite : bool
            If True, favor the composite screen.
        """
        num_devices = 0
        num_signals = 0
        for attr, component in utils._get_top_level_components(device_cls):
            num_devices += issubclass(component.cls, ophyd.Device)
            num_signals += issubclass(component.cls, ophyd.Signal)

        specific_screens = cls._get_specific_screens(device_cls)
        if (len(specific_screens) or
                (num_devices <= cls.device_count_threshold and
                 num_signals >= cls.signal_count_threshold)):
            # 1. There's a custom screen - we probably should use them
            # 2. There aren't many devices, so the composite display isn't
            #    useful
            # 3. There are many signals, which should be broken up somehow
            composite = False
        else:
            # 1. No custom screen, or
            # 2. Many devices or a relatively small number of signals
            composite = True

        logger.debug(