Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _add_monitors(self):
for attr in self.monitor_attrs:
obj = getattr(self, attr)
if isinstance(obj, Device):
raise ValueError('Cannot monitor Devices, only Signals.')
cb = functools.partial(self._monitor_callback, attribute=attr)
self._monitors[obj] = cb
obj.subscribe(cb)
return self.parent.sim_state['setpoint']
def describe(self):
res = super().describe()
# There should be only one key here, but for the sake of generality....
for k in res:
res[k]['precision'] = self.parent.precision
return res
@property
def timestamp(self):
'''Timestamp of the readback value'''
return self.parent.sim_state['setpoint_ts']
class SynAxisNoHints(Device):
"""
A synthetic settable Device mimic any 1D Axis (position, temperature).
Parameters
----------
name : string, keyword only
readback_func : callable, optional
When the Device is set to ``x``, its readback will be updated to
``f(x)``. This can be used to introduce random noise or a systematic
offset.
Expected signature: ``f(x) -> value``.
value : object, optional
The initial value. Default is 0.
delay : number, optional
Simulates how long it takes the device to "move". Default is 0 seconds.
precision : integer, optional
from .utils.epics_pvs import (raise_if_disconnected, AlarmSeverity)
from .positioner import PositionerBase
from .device import (Device, Component as Cpt, required_for_connection)
from .status import wait as status_wait
from enum import Enum
logger = logging.getLogger(__name__)
class HomeEnum(str, Enum):
forward = "forward"
reverse = "reverse"
class EpicsMotor(Device, PositionerBase):
'''An EPICS motor record, wrapped in a :class:`Positioner`
Keyword arguments are passed through to the base class, Positioner
Parameters
----------
prefix : str
The record to use
read_attrs : sequence of attribute names
The signals to be read during data acquisition (i.e., in read() and
describe() calls)
name : str, optional
The name of the device
parent : instance or None
The instance of the parent device, if applicable
settle_time : float, optional
def get_file_list(self, datum_kwarg_gen):
"This method is optional. It is not needed for access, but for export."
return ['{name}_{index}.npy'.format(name=self._name, **kwargs)
for kwargs in datum_kwarg_gen]
class ABDetector(Device):
a = Component(SynSignal, func=random.random, kind=Kind.hinted)
b = Component(SynSignal, func=random.random)
def trigger(self):
return self.a.trigger() & self.b.trigger()
class DetWithCountTime(Device):
intensity = Component(SynSignal, func=lambda: 0, kind=Kind.hinted)
count_time = Component(Signal)
class DetWithConf(Device):
a = Component(SynSignal, func=lambda: 1, kind=Kind.hinted)
b = Component(SynSignal, func=lambda: 2, kind=Kind.hinted)
c = Component(SynSignal, func=lambda: 3)
d = Component(SynSignal, func=lambda: 4)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.read_attrs = ['a', 'b']
self.configuration_attrs = ['c', 'd']
def trigger(self):
def _ddc_helper(signal_class, *items, kind='config', doc=None, **kwargs):
'DynamicDeviceComponent using one signal class for all components'
return DynamicDeviceComponent(
ad_group(signal_class, items, kind=kind, **kwargs),
doc=doc,
)
DDC_EpicsSignal = functools.partial(_ddc_helper, EpicsSignal)
DDC_EpicsSignalRO = functools.partial(_ddc_helper, EpicsSignalRO)
DDC_SignalWithRBV = functools.partial(_ddc_helper, EpicsSignalWithRBV)
class ADBase(Device):
'''The AreaDetector base class
This serves as the base for all detectors and plugins
'''
_html_docs = ['areaDetectorDoc.html']
_default_read_attrs = ()
_default_configuration_attrs = ()
def find_signal(self, text, use_re=False, case_sensitive=False,
match_fcn=None, f=sys.stdout):
'''Search through the signal docs on this detector for the string text
Parameters
----------
text : str
-------
param : EpicsDXPLowLevelParameter
'''
try:
return self._parameter_cache[index]
except KeyError:
pass
prefix = '{}{}'.format(self.prefix, self.parameter_prefix)
name = '{}_param{}'.format(self.name, index)
param = EpicsDXPLowLevelParameter(prefix, name=name)
self._parameter_cache[index] = param
return param
class EpicsDXPMapping(Device):
apply = Cpt(EpicsSignal, 'Apply')
auto_apply = Cpt(SignalWithRBV, 'AutoApply')
auto_pixels_per_buffer = Cpt(SignalWithRBV, 'AutoPixelsPerBuffer')
buffer_size = Cpt(EpicsSignalRO, 'BufferSize_RBV')
collect_mode = Cpt(SignalWithRBV, 'CollectMode')
ignore_gate = Cpt(SignalWithRBV, 'IgnoreGate')
input_logic_polarity = Cpt(SignalWithRBV, 'InputLogicPolarity')
list_mode = Cpt(SignalWithRBV, 'ListMode')
mbytes_read = Cpt(EpicsSignalRO, 'MBytesRead_RBV')
next_pixel = Cpt(EpicsSignal, 'NextPixel')
pixel_advance_mode = Cpt(SignalWithRBV, 'PixelAdvanceMode')
pixels_per_buffer = Cpt(SignalWithRBV, 'PixelsPerBuffer')
pixels_per_run = Cpt(SignalWithRBV, 'PixelsPerRun')
read_rate = Cpt(EpicsSignalRO, 'ReadRate_RBV')
sync_count = Cpt(SignalWithRBV, 'SyncCount')
class ABDetector(Device):
a = Component(SynSignal, func=random.random, kind=Kind.hinted)
b = Component(SynSignal, func=random.random)
def trigger(self):
return self.a.trigger() & self.b.trigger()
class DetWithCountTime(Device):
intensity = Component(SynSignal, func=lambda: 0, kind=Kind.hinted)
count_time = Component(Signal)
class DetWithConf(Device):
a = Component(SynSignal, func=lambda: 1, kind=Kind.hinted)
b = Component(SynSignal, func=lambda: 2, kind=Kind.hinted)
c = Component(SynSignal, func=lambda: 3)
d = Component(SynSignal, func=lambda: 4)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.read_attrs = ['a', 'b']
self.configuration_attrs = ['c', 'd']
def trigger(self):
return self.a.trigger() & self.b.trigger()
class InvariantSignal(SynSignal):
# Always returns the same reading, including timestamp.
on class construction and loudly when manipulating an object.
Parameters
----------
cls : Device
A real Device class to inspect and create a fake Device class from
Returns
-------
fake_device : Device
The resulting fake Device class
"""
# Cache to avoid repeating work.
# EpicsSignal and EpicsSignalRO begin in the cache.
if cls not in fake_device_cache:
if not issubclass(cls, Device):
# Ignore non-devices and non-epics-signals
logger.debug('Ignore cls=%s, bases are %s', cls, cls.__bases__)
fake_device_cache[cls] = cls
return cls
fake_dict = {}
# Update all the components recursively
for cpt_name in cls.component_names:
cpt = getattr(cls, cpt_name)
fake_cpt = copy.copy(cpt)
if isinstance(cpt, Component):
fake_cpt.cls = make_fake_device(cpt.cls)
logger.debug('switch cpt_name=%s to cls=%s',
cpt_name, fake_cpt.cls)
# DDCpt stores the classes in a different place
elif isinstance(cpt, DDC):
fake_defn = {}
self.match_name()
def match_name(self):
self.s.name = self.chname.get()
def _sc_chans(attr_fix, id_range):
defn = OrderedDict()
for k in id_range:
defn['{}{:02d}'.format(attr_fix, k)] = (ScalerChannel,
'', {'ch_num': k,
'kind': Kind.normal})
return defn
class ScalerCH(Device):
# The data
channels = DDCpt(_sc_chans('chan', range(1, 33)))
# tigger + trigger mode
count = Cpt(EpicsSignal, '.CNT', trigger_value=1, kind=Kind.omitted)
count_mode = Cpt(EpicsSignal, '.CONT', string=True, kind=Kind.config)
# delay from triggering to starting counting
delay = Cpt(EpicsSignal, '.DLY', kind=Kind.config)
auto_count_delay = Cpt(EpicsSignal, '.DLY1', kind=Kind.config)
time = Cpt(EpicsSignal, '.T')
freq = Cpt(EpicsSignal, '.FREQ', kind=Kind.config)
preset_time = Cpt(EpicsSignal, '.TP', kind=Kind.config)
current_pixel = Cpt(EpicsSignal, 'CurrentPixel')
dynamic_range = Cpt(EpicsSignalRO, 'DynamicRange_RBV')
# Preset options
preset_events = Cpt(SignalWithRBV, 'PresetEvents')
preset_mode = Cpt(SignalWithRBV, 'PresetMode', string=True)
preset_triggers = Cpt(SignalWithRBV, 'PresetTriggers')
# Trace options
trace_data = Cpt(EpicsSignal, 'TraceData')
trace_mode = Cpt(SignalWithRBV, 'TraceMode', string=True)
trace_time_array = Cpt(EpicsSignal, 'TraceTimeArray')
trace_time = Cpt(SignalWithRBV, 'TraceTime')
class EpicsDXPLowLevelParameter(Device):
param_name = Cpt(EpicsSignal, 'Name')
value = Cpt(SignalWithRBV, 'Val')
class EpicsDXPLowLevel(Device):
num_low_level_params = Cpt(EpicsSignal, 'NumLLParams')
read_low_level_params = Cpt(EpicsSignal, 'ReadLLParams')
parameter_prefix = 'LL{}'
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._parameter_cache = {}
def get_low_level_parameter(self, index):
'''Get a DXP low level parameter