Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_attribute_signal():
init_value = 33
class SubDevice(Device):
@property
def prop(self):
return init_value
@prop.setter
def prop(self, value):
pass
class MyDevice(Device):
sub1 = Component(SubDevice, '1')
attrsig = Component(AttributeSignal, 'prop')
sub_attrsig = Component(AttributeSignal, 'sub1.prop')
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._value = init_value
@property
def prop(self):
return self._value
@prop.setter
def prop(self, value):
self._value = value
dev = MyDevice('', name='mydev')
if self.prefix.endswith('_raises_'):
raise Exception('stop failed for some reason')
class SubDevice(Device):
cpt1 = Component(FakeSignal, '1')
cpt2 = Component(FakeSignal, '2')
cpt3 = Component(FakeSignal, '3')
subsub = Component(SubSubDevice, '')
def stop(self, *, success=False):
self.stop_called = True
self.success = success
super().stop(success=success)
class MyDevice(Device):
sub1 = Component(SubDevice, '1')
sub2 = Component(SubDevice, '_raises_')
sub3 = Component(SubDevice, '_raises_')
cpt3 = Component(FakeSignal, '3')
dev = MyDevice('', name='mydev')
with self.assertRaises(ExceptionBundle) as cm:
dev.stop()
ex = cm.exception
self.assertEquals(len(ex.exceptions), 2)
self.assertTrue(dev.sub1.stop_called)
self.assertTrue(dev.sub2.stop_called)
self.assertTrue(dev.sub3.stop_called)
self.assertFalse(dev.sub1.success)
self.assertFalse(dev.sub2.success)
self.assertFalse(dev.sub3.success)
fm = self.fake_motor
epics.caput(fm['setpoint'], 0.05)
time.sleep(0.5)
epics.caput(fm['actuate'], 1)
time.sleep(0.5)
epics.caput(fm['setpoint'], 0)
time.sleep(0.5)
epics.caput(fm['actuate'], 1)
time.sleep(0.5)
class MyPositioner(PVPositioner):
'''Setpoint, readback, no put completion. No done pv.'''
setpoint = C(EpicsSignal, fm['setpoint'])
readback = C(EpicsSignalRO, fm['readback'])
actuate = C(EpicsSignal, fm['actuate'])
stop_signal = C(EpicsSignal, fm['stop'])
done = C(EpicsSignal, fm['moving'])
actuate_value = 1
stop_value = 1
done_value = 1
pos = MyPositioner('', name='pv_pos_fake_mtr')
print('fake mtr', pos.describe())
pos.wait_for_connection()
pos.subscribe(callback, event_type=pos.SUB_DONE)
pos.subscribe(callback, event_type=pos.SUB_READBACK)
logger.info('---- test #1 ----')
logger.info('--> move to 1')
pos.move(1, timeout=5)
'tags': {'confirm', 'protected'},
'value': 1,
}
)
command_proc = Cpt(EpicsSignal, 'command-without-enum')
set_metadata(command_proc, {'variety': 'command-proc'})
command_enum = Cpt(EpicsSignal, 'command-without-enum')
set_metadata(command_enum,
{'variety': 'command-enum',
'enum_dict': {0: 'No', 1: 'Yes', 3: 'Metadata-defined'},
}
)
command_setpoint_tracks_readback = Cpt(EpicsSignal,
'command-setpoint-tracks-readback')
set_metadata(command_setpoint_tracks_readback,
{'variety': 'command-setpoint-tracks-readback'})
tweakable = Cpt(EpicsSignal, 'tweakable')
set_metadata(
tweakable,
{'variety': 'scalar-tweakable',
'delta.value': 0.5,
'delta.range': [-1, 1],
'range.source': 'value',
'range.value': [-1, 1],
}
)
array_timeseries = Cpt(EpicsSignal, 'array-timeseries')
class TimeSeriesPlugin_V25(PluginBase_V22, TimeSeriesPlugin, version=(2, 5), version_of=TimeSeriesPlugin):
ts_acquire = Cpt(EpicsSignal, "TSAcquire")
ts_acquire_mode = Cpt(
SignalWithRBV, "TSAcquireMode", string=True, doc="0='Fixed length' 1='Circ. buffer'"
)
ts_acquiring = Cpt(EpicsSignal, "TSAcquiring", string=True, doc="0='Done' 1='Acquiring'")
ts_averaging_time = Cpt(SignalWithRBV, "TSAveragingTime")
ts_current_point = Cpt(EpicsSignal, "TSCurrentPoint")
ts_elapsed_time = Cpt(EpicsSignal, "TSElapsedTime")
ts_num_average = Cpt(EpicsSignal, "TSNumAverage")
ts_num_points = Cpt(EpicsSignal, "TSNumPoints")
ts_read = Cpt(EpicsSignal, "TSRead", string=True, doc="0='Done' 1='Read'")
ts_time_axis = Cpt(EpicsSignal, "TSTimeAxis")
ts_time_per_point = Cpt(SignalWithRBV, "TSTimePerPoint")
ts_time_per_point_link = Cpt(EpicsSignal, "TSTimePerPointLink")
ts_timestamp = Cpt(EpicsSignal, "TSTimestamp")
class TimeSeriesPlugin_V26(PluginBase_V26, TimeSeriesPlugin_V25, version=(2, 6), version_of=TimeSeriesPlugin):
...
class TimeSeriesPlugin_V31(PluginBase_V31, TimeSeriesPlugin_V26, version=(3, 1), version_of=TimeSeriesPlugin):
...
class TimeSeriesPlugin_V33(PluginBase_V33, TimeSeriesPlugin_V31, version=(3, 3), version_of=TimeSeriesPlugin):
...
class CodecPlugin(Device, version_type='ADCore'):
"Serves as a base class for other versions"
...
class CodecPlugin_V34(PluginBase_V34, CodecPlugin, version=(3, 4), version_of=CodecPlugin):
blosc_cl_evel = Cpt(SignalWithRBV, "BloscCLevel")
blosc_compressor = Cpt(
SignalWithRBV, "BloscCompressor", string=True, doc="0=BloscLZ 1=LZ4 2=LZ4HC 3=SNAPPY 4=ZLIB 5=ZSTD"
)
blosc_num_threads = Cpt(SignalWithRBV, "BloscNumThreads")
blosc_shuffle = Cpt(SignalWithRBV, "BloscShuffle", string=True, doc="0=None 1=Bit 2=Byte")
codec_error = Cpt(EpicsSignal, "CodecError")
codec_status = Cpt(EpicsSignal, "CodecStatus", string=True, doc="0=Success 1=Warning 2=Error")
comp_factor = Cpt(EpicsSignalRO, "CompFactor_RBV")
compressor = Cpt(SignalWithRBV, "Compressor", string=True, doc="0=None 1=JPEG 2=Blosc")
jpeg_quality = Cpt(SignalWithRBV, "JPEGQuality")
mode = Cpt(SignalWithRBV, "Mode", string=True, doc="0=Compress 1=Decompress")
class AttributePlugin(Device, version_type='ADCore'):
"Serves as a base class for other versions"
...
class AttributePlugin_V20(PluginBase_V20, AttributePlugin, version=(2, 0), version_of=AttributePlugin):
array_data = Cpt(EpicsSignalRO, 'ArrayData_RBV')
attribute_name = Cpt(SignalWithRBV, 'AttrName')
reset = Cpt(EpicsSignal, 'Reset', string=True, doc="0='Done Reset' 1='Reset'")
reset_array_counter = Cpt(EpicsSignal, 'ResetArrayCounter')
# --- NDCircularBuff ---
class CircularBuffPlugin(Device, version_type='ADCore'):
"Serves as a base class for other versions"
...
class CircularBuffPlugin_V22(PluginBase_V22, CircularBuffPlugin, version=(2, 2), version_of=CircularBuffPlugin):
actual_trigger_count = Cpt(EpicsSignalRO, "ActualTriggerCount_RBV")
capture = Cpt(SignalWithRBV, "Capture")
current_qty = Cpt(EpicsSignalRO, "CurrentQty_RBV")
post_count = Cpt(SignalWithRBV, "PostCount")
post_trigger_qty = Cpt(EpicsSignalRO, "PostTriggerQty_RBV")
pre_count = Cpt(SignalWithRBV, "PreCount")
preset_trigger_count = Cpt(SignalWithRBV, "PresetTriggerCount")
status_message = Cpt(EpicsSignal, "StatusMessage", string=True)
trigger_ = Cpt(SignalWithRBV, "Trigger")
trigger_a = Cpt(SignalWithRBV, "TriggerA", string=True)
trigger_a_val = Cpt(EpicsSignal, "TriggerAVal")
trigger_b = Cpt(SignalWithRBV, "TriggerB", string=True)
trigger_b_val = Cpt(EpicsSignal, "TriggerBVal")
trigger_calc = Cpt(SignalWithRBV, "TriggerCalc")
trigger_calc_val = Cpt(EpicsSignal, "TriggerCalcVal")
array_size_xyz = DDC_EpicsSignalRO(
("array_size_x", "ArraySizeX_RBV"),
("array_size_y", "ArraySizeY_RBV"),
("array_size_z", "ArraySizeZ_RBV"),
)
SignalWithRBV, "DataType", string=True,
doc="0=Int8 1=UInt8 2=Int16 3=UInt16 4=Int32 5=UInt32 6=Float32 7=Float64",
)
array_size_xyz = DDC_EpicsSignalRO(
("array_size_x", "ArraySizeX_RBV"),
("array_size_y", "ArraySizeY_RBV"),
("array_size_z", "ArraySizeZ_RBV"),
)
class PluginBase_V26(PluginBase_V22, version=(2, 6), version_of=PluginBase):
queue_size = Cpt(SignalWithRBV, 'QueueSize')
dimensions = Cpt(SignalWithRBV, "Dimensions")
driver_version = Cpt(EpicsSignalRO, "DriverVersion_RBV", string=True)
execution_time = Cpt(EpicsSignalRO, "ExecutionTime_RBV", string=True)
ndimensions = Cpt(SignalWithRBV, "NDimensions", string=True)
array_size_all = DDC_SignalWithRBV(
("array_size0", "ArraySize0"),
("array_size1", "ArraySize1"),
("array_size2", "ArraySize2"),
("array_size3", "ArraySize3"),
("array_size4", "ArraySize4"),
("array_size5", "ArraySize5"),
("array_size6", "ArraySize6"),
("array_size7", "ArraySize7"),
("array_size8", "ArraySize8"),
("array_size9", "ArraySize9"),
doc="array_size",
)
dim_sa = DDC_SignalWithRBV(
("dim0_sa", "Dim0SA"),
("dim1_sa", "Dim1SA"),
class HDF5Plugin_V21(FilePlugin_V21, HDF5Plugin_V20, version=(2, 1), version_of=HDF5Plugin):
xml_error_msg = Cpt(EpicsSignalRO, "XMLErrorMsg_RBV")
xml_file_name = Cpt(SignalWithRBV, "XMLFileName")
xml_valid = Cpt(EpicsSignalRO, "XMLValid_RBV", string=True, doc="0='No' 1='Yes'")
class HDF5Plugin_V22(FilePlugin_V22, HDF5Plugin_V21, version=(2, 2), version_of=HDF5Plugin):
nd_attribute_chunk = Cpt(SignalWithRBV, "NDAttributeChunk")
class HDF5Plugin_V25(HDF5Plugin_V22, version=(2, 5), version_of=HDF5Plugin):
dim_att_datasets = Cpt(SignalWithRBV, "DimAttDatasets", string=True,
doc="0='No' 1='Yes'")
fill_value = Cpt(SignalWithRBV, "FillValue")
position_mode = Cpt(SignalWithRBV, "PositionMode", string=True,
doc="0='Off' 1='On'")
swmr_active = Cpt(EpicsSignalRO, "SWMRActive_RBV", string=True,
doc="0='Off' 1='Active'")
swmr_cb_counter = Cpt(EpicsSignalRO, "SWMRCbCounter_RBV")
swmr_mode = Cpt(SignalWithRBV, "SWMRMode", string=True,
doc="0='Off' 1='On'")
swmr_supported = Cpt(EpicsSignalRO, "SWMRSupported_RBV", string=True,
doc="0='Not Supported' 1='Supported'")
extra_dim_chunk = DDC_SignalWithRBV(
("chunk_3", "ExtraDimChunk3"),
("chunk_4", "ExtraDimChunk4"),
("chunk_5", "ExtraDimChunk5"),
("chunk_6", "ExtraDimChunk6"),
("chunk_7", "ExtraDimChunk7"),
("chunk_8", "ExtraDimChunk8"),
def ophyd_device_to_caproto_ioc(dev, *, depth=0):
import ophyd
if isinstance(dev, ophyd.DynamicDeviceComponent):
# DynamicDeviceComponent: attr: (sig_cls, prefix, kwargs)
# NOTE: cannot inspect types without an instance of the dynamic Device
# class
attr_components = {
attr: ophyd.Component(sig_cls, prefix, **kwargs)
for attr, (sig_cls, prefix, kwargs) in dev.defn.items()
}
dev_name = f'{dev.attr}_group'
cls, dev = dev, None
else:
if inspect.isclass(dev):
# we can introspect Device directly, but we cannot connect to PVs
# and tell about their data type
cls, dev = dev, None
else:
# if connected, we can reach out to PVs and determine data types
cls = dev.__class__
attr_components = cls._sig_attrs
dev_name = f'{cls.__name__}_group'
dev_name = underscore_to_camel_case(dev_name)