How to use ophyd - 10 common examples

To help you get started, we’ve selected a few ophyd examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github bluesky / ophyd / tests / test_signal.py View on Github external
def test_set_method():
    sig = Signal()

    st = sig.set(28)
    wait(st)
    assert st.done
    assert st.success
    assert sig.get() == 28
github pcdshub / typhon / tests / plugins / test_core.py View on Github external
def test_signal_connection(qapp, qtbot):
    # Create a signal and attach our listener
    sig = Signal(name='my_signal', value=1)
    register_signal(sig)
    widget = PyDMLineEdit()
    qtbot.addWidget(widget)
    widget.channel = 'sig://my_signal'
    listener = widget.channels()[0]
    # If PyDMChannel can not connect, we need to connect it ourselves
    # In PyDM > 1.5.0 this will not be neccesary as the widget will be
    # connected after we set the channel name
    if not hasattr(listener, 'connect'):
        pydm.utilities.establish_widget_connections(widget)
    # Check that our widget receives the initial value
    qapp.processEvents()
    assert widget._write_access
    assert widget._connected
    assert widget.value == 1
    # Check that we can push values back to the signal which in turn causes the
github pcdshub / typhon / tests / plugins / test_core.py View on Github external
def test_array_signal_send_value(qapp, qtbot):
    sig = Signal(name='my_array', value=np.ones(4))
    register_signal(sig)
    widget = PyDMLineEdit()
    qtbot.addWidget(widget)
    widget.channel = 'sig://my_array'
    qapp.processEvents()
    assert all(widget.value == np.ones(4))
github bluesky / ophyd / tests / test_device.py View on Github external
def test_attribute_signal():
    init_value = 33

    class SubDevice(Device):
        @property
        def prop(self):
            return init_value

        @prop.setter
        def prop(self, value):
            pass

    class MyDevice(Device):
        sub1 = Component(SubDevice, '1')
        attrsig = Component(AttributeSignal, 'prop')
        sub_attrsig = Component(AttributeSignal, 'sub1.prop')

        def __init__(self, *args, **kwargs):
            super().__init__(*args, **kwargs)
            self._value = init_value

        @property
        def prop(self):
            return self._value

        @prop.setter
        def prop(self, value):
            self._value = value

    dev = MyDevice('', name='mydev')
github bluesky / ophyd / tests / test_device.py View on Github external
if self.prefix.endswith('_raises_'):
                    raise Exception('stop failed for some reason')

        class SubDevice(Device):
            cpt1 = Component(FakeSignal, '1')
            cpt2 = Component(FakeSignal, '2')
            cpt3 = Component(FakeSignal, '3')
            subsub = Component(SubSubDevice, '')

            def stop(self, *, success=False):
                self.stop_called = True
                self.success = success
                super().stop(success=success)

        class MyDevice(Device):
            sub1 = Component(SubDevice, '1')
            sub2 = Component(SubDevice, '_raises_')
            sub3 = Component(SubDevice, '_raises_')
            cpt3 = Component(FakeSignal, '3')

        dev = MyDevice('', name='mydev')
        with self.assertRaises(ExceptionBundle) as cm:
            dev.stop()

        ex = cm.exception
        self.assertEquals(len(ex.exceptions), 2)
        self.assertTrue(dev.sub1.stop_called)
        self.assertTrue(dev.sub2.stop_called)
        self.assertTrue(dev.sub3.stop_called)
        self.assertFalse(dev.sub1.success)
        self.assertFalse(dev.sub2.success)
        self.assertFalse(dev.sub3.success)
github bluesky / ophyd / tests / test_pvpositioner.py View on Github external
fm = self.fake_motor
        epics.caput(fm['setpoint'], 0.05)
        time.sleep(0.5)
        epics.caput(fm['actuate'], 1)
        time.sleep(0.5)
        epics.caput(fm['setpoint'], 0)
        time.sleep(0.5)
        epics.caput(fm['actuate'], 1)
        time.sleep(0.5)

        class MyPositioner(PVPositioner):
            '''Setpoint, readback, no put completion. No done pv.'''
            setpoint = C(EpicsSignal, fm['setpoint'])
            readback = C(EpicsSignalRO, fm['readback'])
            actuate = C(EpicsSignal, fm['actuate'])
            stop_signal = C(EpicsSignal, fm['stop'])
            done = C(EpicsSignal, fm['moving'])

            actuate_value = 1
            stop_value = 1
            done_value = 1

        pos = MyPositioner('', name='pv_pos_fake_mtr')
        print('fake mtr', pos.describe())
        pos.wait_for_connection()

        pos.subscribe(callback, event_type=pos.SUB_DONE)
        pos.subscribe(callback, event_type=pos.SUB_READBACK)

        logger.info('---- test #1 ----')
        logger.info('--> move to 1')
        pos.move(1, timeout=5)
github pcdshub / typhon / tests / variety_ioc.py View on Github external
'tags': {'confirm', 'protected'},
                  'value': 1,
                  }
                 )

    command_proc = Cpt(EpicsSignal, 'command-without-enum')
    set_metadata(command_proc, {'variety': 'command-proc'})

    command_enum = Cpt(EpicsSignal, 'command-without-enum')
    set_metadata(command_enum,
                 {'variety': 'command-enum',
                  'enum_dict': {0: 'No', 1: 'Yes', 3: 'Metadata-defined'},
                  }
                 )

    command_setpoint_tracks_readback = Cpt(EpicsSignal,
                                           'command-setpoint-tracks-readback')
    set_metadata(command_setpoint_tracks_readback,
                 {'variety': 'command-setpoint-tracks-readback'})

    tweakable = Cpt(EpicsSignal, 'tweakable')
    set_metadata(
        tweakable,
        {'variety': 'scalar-tweakable',
         'delta.value': 0.5,
         'delta.range': [-1, 1],
         'range.source': 'value',
         'range.value': [-1, 1],
         }
    )

    array_timeseries = Cpt(EpicsSignal, 'array-timeseries')
github bluesky / ophyd / tests / test_timestamps.py View on Github external
def test_read_pv_timestamp_no_monitor(self):
        mtr = EpicsMotor(config.motor_recs[0])
        mtr.wait_for_connection()

        sp = EpicsSignal(mtr.user_setpoint.pvname)
        rbv = EpicsSignalRO(mtr.user_readback.pvname)

        rbv_value0 = rbv.get()
        ts0 = rbv.timestamp
        sp.put(sp.value + 0.1, wait=True)
        time.sleep(0.1)

        rbv_value1 = rbv.get()
        ts1 = rbv.timestamp
        self.assertGreater(ts1, ts0)
        self.assertAlmostEqual(rbv_value0 + 0.1, rbv_value1)

        sp.put(sp.value - 0.1, wait=True)
github bluesky / ophyd / tests / test_timestamps.py View on Github external
def test_write_pv_timestamp_no_monitor(self):
        mtr = EpicsMotor(config.motor_recs[0])
        mtr.wait_for_connection()

        sp = EpicsSignal(mtr.user_setpoint.pvname)

        sp_value0 = sp.get()
        ts0 = sp.timestamp
        sp.put(sp_value0 + 0.1, wait=True)
        time.sleep(0.1)

        sp_value1 = sp.get()
        ts1 = sp.timestamp
        self.assertGreater(ts1, ts0)
        self.assertAlmostEqual(sp_value0 + 0.1, sp_value1)

        sp.put(sp.value - 0.1, wait=True)
github bluesky / ophyd / tests / test_pseudopos.py View on Github external
# logger.debug('forward %s', pseudo_pos)
        return self.RealPosition(real1=-pseudo_pos.pseudo1,
                                 real2=-pseudo_pos.pseudo2,
                                 real3=-pseudo_pos.pseudo3)

    def inverse(self, real_pos):
        real_pos = self.RealPosition(*real_pos)
        # logger.debug('inverse %s', real_pos)
        return self.PseudoPosition(pseudo1=real_pos.real1,
                                   pseudo2=real_pos.real2,
                                   pseudo3=real_pos.real3)


class Pseudo1x3(PseudoPositioner):
    pseudo1 = C(PseudoSingle, limits=(-10, 10))
    real1 = C(EpicsMotor, motor_recs[0])
    real2 = C(EpicsMotor, motor_recs[1])
    real3 = C(EpicsMotor, motor_recs[2])

    def forward(self, pseudo_pos):
        pseudo_pos = self.PseudoPosition(*pseudo_pos)
        # logger.debug('forward %s', pseudo_pos)
        return self.RealPosition(real1=-pseudo_pos.pseudo1,
                                 real2=-pseudo_pos.pseudo1,
                                 real3=-pseudo_pos.pseudo1)

    def inverse(self, real_pos):
        real_pos = self.RealPosition(*real_pos)
        # logger.debug('inverse %s', real_pos)
        return self.PseudoPosition(pseudo1=-real_pos.real1)