Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@pyqtSlot(tuple)
def recv_enums(self, enums):
"""
Get enum strings signal
:param enums: enum strings
:type enums: tuple of str
"""
self.enums = enums
self.enums_updated_signal.emit()
@pyqtSlot(int)
def recv_sevr(self, sevr):
"""
Get alarm state signal
:param sevr: alarm state
:type sevr: int
"""
self.sevr = sevr
self.sevr_updated_signal.emit()
@pyqtSlot(float)
@pyqtSlot(str)
def send_value(self, value):
"""
Send desired value to plugin slot.
:param value: value to send
:type value: int, float, or str
"""
self.send_value_signal[type(value)].emit(value)
"""
Generic PyDM Widget for testing plugins. Contains a more thorough waveform
Channel.
Lots of copy/paste because pyqt multiple inheritance is broken.
"""
__pyqtSignals__ = ("send_waveform_signal(np.ndarray)",
"waveform_updated_signal()",
"conn_updated_signal()",
"sevr_updated_signal()",
"rwacc_updated_signal()",
"enums_updated_signal()",
"units_updated_signal()",
"prec_updated_signal()",)
send_waveform_signal = pyqtSignal(np.ndarray)
waveform_updated_signal = pyqtSignal()
def __init__(self, channel=None, parent=None):
super(WaveformWidget, self).__init__(channel=channel, parent=parent)
@pyqtSlot(np.ndarray)
def recv_waveform(self, waveform):
"""
Recieve waveform from plugin signal. Store in self.value.
:param waveform: waveform
:type waveform: np.ndarray
"""
self.value = waveform
self.waveform_updated_signal.emit()
def test_line_edit_history(qtbot, motor):
widget = widgets.TyphosLineEdit()
qtbot.addWidget(widget)
widget.channel = 'sig://' + ophyd.sim.motor.setpoint.name
widget.channeltype = int # hack
pydm.utilities.establish_widget_connections(widget)
items = list(range(10))
for i in items:
widget.setText(str(i))
widget.send_value()
expected = items[-widget.setpointHistoryCount:]
assert list(widget.setpoint_history) == [str(s) for s in expected]
# Smoke test menu creation
menu = widget.widget_ctx_menu()
qtbot.addWidget(menu)
Check that we can put to the waveform set function
"""
array = np.asarray((5, 5, 6))
self.set_widget.send_waveform(array)
self.event_loop(0)
result = self.obj.get()
self.assertEqual(tuple(result), tuple(array),
msg="waveform function has wrong result: was {0}, expected {1}".format(
result, array))
class SignalHolder(QObject):
"""
Dummy QObject to let us use pyqtSignal
"""
value_sig = pyqtSignal([int], [float], [str])
empty_sig = pyqtSignal()
class SignalTestCase(LocalPluginTest):
def test_signal_update(self):
"""
Check that we can update t=0 widgets intelligently with signals
"""
sig = SignalHolder()
self.plugin.connect_to_update("x?t=0", sig.empty_sig)
self.stale_widget.send_value(4)
self.event_loop(0.01)
self.assertEqual(self.stale_widget.value, 4,
msg="t=0 widgets do not update on puts")
self.obj.x = 10
self.event_loop(2)
self.assertEqual(self.stale_widget.value, 4,
# One more process step just in case (this actually matters I think)
self.app.processEvents()
signal.disconnect(queue.put_to_queue)
# Get value if we have one (otherwise, this is None)
value = queue.get()
return value
class QueueSlots(QObject):
"""
Dummy object that contains a slot that puts signal results to a queue.
Exists for the implementation of PyDMTest.signal_wait.
"""
__pyqtSignals__ = ("got_value()",)
got_value = pyqtSignal()
def __init__(self, parent=None):
"""
Set up QObject and create internal queue.
"""
super(QueueSlots, self).__init__(parent=parent)
self.queue = Queue.Queue()
def get(self, timeout=None):
"""
Retrieve queue value or None. Waits for timeout seconds.
:param timeout: get timeout in seconds
:type timeout: float or int
:rtyp: object or None
"""
def channels(self):
"""
Return list of channels, in this case one channel with every field
filled except for the waveform fields.
:rtyp: list of :class:Channel
"""
return [Channel(address=self.channel,
value_slot=self.recv_value,
value_signal=self.send_value_signal,
connection_slot=self.recv_conn,
severity_slot=self.recv_sevr,
write_access_slot=self.recv_rwacc,
enum_strings_slot=self.recv_enums,
unit_slot=self.recv_units,
prec_slot=self.recv_prec)]
def channels(self):
"""
Return list of channels, in this case one channel with every field
filled except for the value fields.
:rtyp: list of :class:Channel
"""
return [Channel(address=self.channel,
waveform_slot=self.recv_waveform,
waveform_signal=self.send_waveform_signal,
connection_slot=self.recv_conn,
severity_slot=self.recv_sevr,
write_access_slot=self.recv_rwacc,
enum_strings_slot=self.recv_enums,
unit_slot=self.recv_units,
prec_slot=self.recv_prec)]
def test_typhos_panel(qapp, client, qtbot, typhos_signal_panel):
panel = typhos_signal_panel
# Setting Kind without device doesn't explode
panel.showConfig = False
panel.showConfig = True
# Add a device channel
panel.channel = 'happi://test_device'
assert panel.channel == 'happi://test_device'
# Reset channel and no smoke comes out
panel.channel = 'happi://test_motor'
pydm.utilities.establish_widget_connections(panel)
def have_device():
assert len(panel.devices) == 1
qtbot.wait_until(have_device)
device = panel.devices[0]
num_hints = len(device.hints['fields'])
num_read = len(device.read_attrs)
def get_visible_signals():
return panel.layout().visible_signals
# Check we got all our signals
assert len(get_visible_signals()) == len(device.component_names)