Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_panel_creation():
panel = SignalPanel(signals={
# Signal is its own write
'Standard': EpicsSignal('Tst:Pv'),
# Signal has separate write/read
'Read and Write': EpicsSignal('Tst:Read',
write_pv='Tst:Write'),
# Signal is read-only
'Read Only': EpicsSignalRO('Tst:Pv:RO'),
# Simulated Signal
'Simulated': SynSignal(name='simul'),
'SimulatedRO': SynSignalRO(name='simul_ro')})
assert len(panel.signals) == 5
# Check read-only channels do not have write widgets
panel.layout().itemAtPosition(2, 1).layout().count() == 1
panel.layout().itemAtPosition(4, 1).layout().count() == 1
# Check write widgets are present
panel.layout().itemAtPosition(0, 1).layout().count() == 2
panel.layout().itemAtPosition(1, 1).layout().count() == 2
panel.layout().itemAtPosition(3, 1).layout().count() == 2
source (ex PV name), and if available, units, limits, precision etc.
The data_keys are mapped to events from `collect` by matching the
keys.
Returns
-------
data_keys_by_stream : dict
The keys must be strings and the values must be dict-like
with keys that are str and the inner values are dict-like
with the ``event_model.event_descriptor.data_key`` schema.
'''
class AreaDetectorTimeseriesCollector(Device):
control = Cpt(EpicsSignal, "TSControl")
num_points = Cpt(EpicsSignal, "TSNumPoints")
cur_point = Cpt(EpicsSignalRO, "TSCurrentPoint")
waveform = Cpt(EpicsSignalRO, "TSTotal")
waveform_ts = Cpt(EpicsSignalRO, "TSTimestamp")
_default_configuration_attrs = ('num_points', )
_default_read_attrs = ()
def __init__(self, *args, stream_name=None, **kwargs):
self.stream_name = stream_name
super().__init__(*args, **kwargs)
def _get_waveforms(self):
n = self.cur_point.get()
'''SynApps Scaler Record interface'''
# tigger + trigger mode
count = Cpt(EpicsSignal, '.CNT', trigger_value=1, kind=Kind.omitted)
count_mode = Cpt(EpicsSignal, '.CONT', string=True, kind=Kind.config)
# delay from triggering to starting counting
delay = Cpt(EpicsSignal, '.DLY', kind=Kind.config)
auto_count_delay = Cpt(EpicsSignal, '.DLY1', kind=Kind.config)
# the data
channels = DDCpt(_scaler_fields(EpicsSignalRO, 'chan', '.S', range(1, 33),
kind=Kind.hinted))
names = DDCpt(_scaler_fields(EpicsSignal, 'name', '.NM', range(1, 33),
kind=Kind.config))
time = Cpt(EpicsSignal, '.T', kind=Kind.config)
freq = Cpt(EpicsSignal, '.FREQ', kind=Kind.config)
preset_time = Cpt(EpicsSignal, '.TP', kind=Kind.config)
auto_count_time = Cpt(EpicsSignal, '.TP1', kind=Kind.config)
presets = DDCpt(_scaler_fields(EpicsSignal, 'preset', '.PR', range(1, 33),
kind=Kind.omitted))
gates = DDCpt(_scaler_fields(EpicsSignal, 'gate', '.G', range(1, 33),
kind=Kind.omitted))
update_rate = Cpt(EpicsSignal, '.RATE', kind=Kind.config)
auto_count_update_rate = Cpt(EpicsSignal, '.RAT1', kind=Kind.config)
egu = Cpt(EpicsSignal, '.EGU', kind=Kind.config)
def __init__(self, *args, **kwargs):
_html_docs = ['RoperDoc.html']
auto_data_type = C(SignalWithRBV, 'AutoDataType')
comment1 = C(SignalWithRBV, 'Comment1')
comment2 = C(SignalWithRBV, 'Comment2')
comment3 = C(SignalWithRBV, 'Comment3')
comment4 = C(SignalWithRBV, 'Comment4')
comment5 = C(SignalWithRBV, 'Comment5')
file_format = C(SignalWithRBV, 'FileFormat')
num_acquisitions = C(SignalWithRBV, 'NumAcquisitions')
num_acquisitions_counter = C(EpicsSignalRO, 'NumAcquisitionsCounter_RBV')
roper_shutter_mode = C(SignalWithRBV, 'RoperShutterMode')
class URLDetectorCam(CamBase):
_html_docs = ['URLDoc.html']
urls = DDC(ad_group(EpicsSignal,
(('url_1', 'URL1'),
('url_2', 'URL2'),
('url_3', 'URL3'),
('url_4', 'URL4'),
('url_5', 'URL5'),
('url_6', 'URL6'),
('url_7', 'URL7'),
('url_8', 'URL8'),
('url_9', 'URL9'),
('url_10', 'URL10'))),
doc='URLs')
url_select = C(EpicsSignal, 'URLSelect')
url_seq = C(EpicsSignal, 'URLSeq')
url = C(EpicsSignalRO, 'URL_RBV')
('dim2', 'Dim2SA'))),
doc='Dimension sub-arrays',
default_read_attrs=('dim0', 'dim1', 'dim2'))
dimensions = C(EpicsSignalRO, 'Dimensions_RBV')
dropped_arrays = C(SignalWithRBV, 'DroppedArrays')
enable = C(SignalWithRBV, 'EnableCallbacks', string=True)
min_callback_time = C(SignalWithRBV, 'MinCallbackTime')
nd_array_address = C(SignalWithRBV, 'NDArrayAddress')
nd_array_port = C(SignalWithRBV, 'NDArrayPort')
ndimensions = C(EpicsSignalRO, 'NDimensions_RBV')
plugin_type = C(EpicsSignalRO, 'PluginType_RBV')
queue_free = C(EpicsSignal, 'QueueFree')
queue_free_low = C(EpicsSignal, 'QueueFreeLow')
queue_size = C(EpicsSignal, 'QueueSize')
queue_use = C(EpicsSignal, 'QueueUse')
queue_use_high = C(EpicsSignal, 'QueueUseHIGH')
queue_use_hihi = C(EpicsSignal, 'QueueUseHIHI')
time_stamp = C(EpicsSignalRO, 'TimeStamp_RBV')
unique_id = C(EpicsSignalRO, 'UniqueId_RBV')
class ImagePlugin(PluginBase):
_default_suffix = 'image1:'
_suffix_re = 'image\d:'
_html_docs = ['NDPluginStdArrays.html']
_plugin_type = 'NDPluginStdArrays'
array_data = C(EpicsSignal, 'ArrayData')
@property
BlueskyInterface, Kind)
from .areadetector import EpicsSignalWithRBV as SignalWithRBV
logger = logging.getLogger(__name__)
class ROI(Device):
# 'name' is not an allowed attribute
label = Cpt(EpicsSignal, 'NM', lazy=True)
count = Cpt(EpicsSignalRO, '', lazy=True)
net_count = Cpt(EpicsSignalRO, 'N', lazy=True)
preset_count = Cpt(EpicsSignal, 'P', lazy=True)
is_preset = Cpt(EpicsSignal, 'IP', lazy=True)
bkgnd_chans = Cpt(EpicsSignal, 'BG', lazy=True)
hi_chan = Cpt(EpicsSignal, 'HI', lazy=True)
lo_chan = Cpt(EpicsSignal, 'LO', lazy=True)
def __init__(self, prefix, *, read_attrs=None, configuration_attrs=None,
name=None, parent=None, **kwargs):
super().__init__(prefix, read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
name=name, parent=parent, **kwargs)
def add_rois(range_, **kwargs):
'''Add one or more ROIs to an MCA instance
Parameters
----------
super().__init__(*args, **kwargs)
self.stage_sigs.update([('count_mode', 0)])
class ScalerChannel(Device):
# TODO set up monitor on this to automatically change the name
chname = FCpt(EpicsSignal, '{self.prefix}.NM{self._ch_num}',
kind=Kind.config)
s = FCpt(EpicsSignalRO, '{self.prefix}.S{self._ch_num}',
kind=Kind.hinted)
preset = FCpt(EpicsSignal, '{self.prefix}.PR{self._ch_num}',
kind=Kind.config)
gate = FCpt(EpicsSignal, '{self.prefix}.G{self._ch_num}', string=True,
kind=Kind.config)
def __init__(self, prefix, ch_num,
**kwargs):
self._ch_num = ch_num
super().__init__(prefix, **kwargs)
self.match_name()
def match_name(self):
self.s.name = self.chname.get()
def _sc_chans(attr_fix, id_range):
defn = OrderedDict()
for k in id_range:
"""
Check fake limits before putting
"""
super().check_value(value)
if self._use_limits and not self.limits[0] < value < self.limits[1]:
raise LimitError('value={} limits={}'.format(value, self.limits))
class FakeEpicsSignalRO(SynSignalRO, FakeEpicsSignal):
"""
Read-only FakeEpicsSignal
"""
pass
fake_device_cache = {EpicsSignal: FakeEpicsSignal,
EpicsSignalRO: FakeEpicsSignalRO}
def hw():
"Build a set of synthetic hardware (hence the abbreviated name, hw)"
motor = SynAxis(name='motor', labels={'motors'})
motor1 = SynAxis(name='motor1', labels={'motors'})
motor2 = SynAxis(name='motor2', labels={'motors'})
motor3 = SynAxis(name='motor3', labels={'motors'})
jittery_motor1 = SynAxis(name='jittery_motor1',
readback_func=lambda x: x + np.random.rand(),
labels={'motors'})
jittery_motor2 = SynAxis(name='jittery_motor2',
readback_func=lambda x: x + np.random.rand(),
labels={'motors'})
for axis in [motor, motor1, motor2, motor3, jittery_motor1, jittery_motor2]:
_html_docs = ['RoperDoc.html']
auto_data_type = ADCpt(SignalWithRBV, 'AutoDataType')
comment1 = ADCpt(SignalWithRBV, 'Comment1')
comment2 = ADCpt(SignalWithRBV, 'Comment2')
comment3 = ADCpt(SignalWithRBV, 'Comment3')
comment4 = ADCpt(SignalWithRBV, 'Comment4')
comment5 = ADCpt(SignalWithRBV, 'Comment5')
file_format = ADCpt(SignalWithRBV, 'FileFormat')
num_acquisitions = ADCpt(SignalWithRBV, 'NumAcquisitions')
num_acquisitions_counter = ADCpt(EpicsSignalRO, 'NumAcquisitionsCounter_RBV')
roper_shutter_mode = ADCpt(SignalWithRBV, 'RoperShutterMode')
class URLDetectorCam(CamBase):
_html_docs = ['URLDoc.html']
urls = DDC(ad_group(EpicsSignal,
(('url_1', 'URL1'),
('url_2', 'URL2'),
('url_3', 'URL3'),
('url_4', 'URL4'),
('url_5', 'URL5'),
('url_6', 'URL6'),
('url_7', 'URL7'),
('url_8', 'URL8'),
('url_9', 'URL9'),
('url_10', 'URL10'))),
doc='URLs')
url_select = ADCpt(EpicsSignal, 'URLSelect')
url_seq = ADCpt(EpicsSignal, 'URLSeq')
url = ADCpt(EpicsSignalRO, 'URL_RBV')
self.stage_sigs.update([('parent.cam.array_callbacks', 1),
])
_default_configuration_attrs = (ADBase._default_configuration_attrs +
('port_name', 'nd_array_port', 'enable',
'blocking_callbacks', 'plugin_type',
'asyn_pipeline_config',
'configuration_names'))
_html_docs = ['pluginDoc.html']
_plugin_type = None
_suffix_re = None
array_counter = C(SignalWithRBV, 'ArrayCounter')
array_rate = C(EpicsSignalRO, 'ArrayRate_RBV')
asyn_io = C(EpicsSignal, 'AsynIO')
nd_attributes_file = C(EpicsSignal, 'NDAttributesFile', string=True)
pool_alloc_buffers = C(EpicsSignalRO, 'PoolAllocBuffers')
pool_free_buffers = C(EpicsSignalRO, 'PoolFreeBuffers')
pool_max_buffers = C(EpicsSignalRO, 'PoolMaxBuffers')
pool_max_mem = C(EpicsSignalRO, 'PoolMaxMem')
pool_used_buffers = C(EpicsSignalRO, 'PoolUsedBuffers')
pool_used_mem = C(EpicsSignalRO, 'PoolUsedMem')
port_name = C(EpicsSignalRO, 'PortName_RBV', string=True)
def stage(self):
super().stage()
def enable_on_stage(self):
"""
when the plugin is staged, ensure that it is enabled.