Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def is_GetMacroEnv_allowed(self):
return self.get_state() in [Macro.Finished, Macro.Abort,
Macro.Exception]
class usenv(Macro):
"""Unsets the given environment variable"""
param_def = [
['environment_list',
ParamRepeat(
['env', Type.Env, None, 'Environment variable name'], min=1),
None, 'List of environment items to be removed'],
]
def run(self, env):
self.unsetEnv(env)
self.output("Success!")
class load_env(Macro):
""" Read environment variables from config_env.xml file"""
def run(self):
doc = etree.parse("config_env.xml")
root = doc.getroot()
for element in root:
if element.find("./name").text == "auto_filter":
self.output("Loading auto_filter variables:")
filter_max_elem = element.find(".//FilterMax")
if filter_max_elem is not None:
filter_max = filter_max_elem.text
self.setEnv("FilterMax", filter_max)
self.output("FilterMax loaded")
else:
self.output("FilterMax not found")
filter_min_elem = element.find(".//FilterMin")
['offon', Type.Boolean, None, 'Unset/Set logging'],
['mode', Type.Integer, -1, 'Mode: 0 append, 1 new file'],
]
def run(self, offon, mode):
if offon:
if mode == 1:
self.setEnv('LogMacroMode', True)
elif mode == 0:
self.setEnv('LogMacroMode', False)
self.setEnv('LogMacro', True)
else:
self.setEnv('LogMacro', False)
class repeat(Hookable, Macro):
"""This macro executes as many repetitions of a set of macros as
specified by nr parameter. The macros to be repeated can be
given as parameters or as body hooks.
If both are given first will be executed the ones given as
parameters and then the ones given as body hooks.
If nr has negative value, repetitions will be executed until you
stop repeat macro.
.. note::
The repeat macro has been included in Sardana
on a provisional basis. Backwards incompatible changes
(up to and including removal of the macro) may occur if
deemed necessary by the core developers."""
hints = {'allowsHooks': ('body',)}
return 0.0
def getIntervalEstimation(self):
return len(self.waypoints)
def _fill_missing_records(self):
# fill record list with dummy records for the final padding
nb_of_points = self.nr_points
scan = self._gScan
nb_of_total_records = len(scan.data.records)
nb_of_records = nb_of_total_records - self.point_id
missing_records = nb_of_points - nb_of_records
scan.data.initRecords(missing_records)
class timescan(Macro, Hookable):
"""Do a time scan over the specified time intervals. The scan starts
immediately. The number of data points collected will be nr_interv + 1.
Count time is given by integ_time. Latency time will be the longer one
of latency_time and measurement group latency time.
"""
hints = {'scan': 'timescan', 'allowsHooks': ('pre-scan', 'pre-acq',
'post-acq', 'post-scan')}
param_def = [
['nr_interv', Type.Integer, None, 'Number of scan intervals'],
['integ_time', Type.Float, None, 'Integration time'],
['latency_time', Type.Float, 0, 'Latency time']]
def prepare(self, nr_interv, integ_time, latency_time):
self.nr_interv = nr_interv
Usage from Spock, ex.:
pt12 [1 3 4] [mot1 mot2]
pt12 1 mot1 # if only one repetition for each repeat parameter
"""
param_def = [
['numb_list', [['pos', Type.Float, None, 'value']], None, 'List of values'],
['motor_list', [['motor', Type.Motor, None, 'Motor to move']],
None, 'List of motors']
]
def run(self, *args, **kwargs):
pass
class pt13(Macro):
"""Macro with list of motors groups, where each motor group is a list of
motors. Repeat parameters may be defined as nested.
Usage from Spock, ex.:
pt13 [[mot1 mot2] [mot3 mot4]]
"""
param_def = [
['motor_group_list',
[['motor_list', [['motor', Type.Motor, None, 'Motor to move']],
None, 'List of motors']],
None, 'Motor groups']
]
def run(self, *args, **kwargs):
pass
param_def = [['element', Type.PoolElement, None, 'element to be renamed'],
['new_name', Type.String, None, 'new name']]
def prepare(self, elem, new_name):
if elem.getType() == "Pool":
raise WrongParam('Pool elements can not be renamed')
def run(self, elem, new_name):
pool = elem.getPoolObj()
old_name = elem.getName()
pool.renameElement(old_name, new_name)
self.print("Renamed %s to %s" % (old_name, new_name))
class udefelem(Macro):
"""Deletes existing elements"""
param_def = [
['elements', [['element', Type.Element, None, 'element name'],
{'min': 1}],
None, 'List of element(s) name'],
]
def run(self, elements):
for element in elements:
pool = element.getPoolObj()
pool.deleteElement(element.getName())
class defctrl(Macro):
"""Creates a new controller
def run(self,ctrlclass):
f_name = ctrlclass.file
pool = ctrlclass.getPool()
data = pool.GetFile(f_name)
data = array.array('B',data).tostring()
line_nb = 1
for line in data.splitlines():
line = line.strip(' \t')
if line.startswith('class') and line.find(ctrlclass.name)>0 and \
line.endswith(":"):
break
line_nb = line_nb + 1
return [f_name,data,line_nb]
class edctrllib(Macro):
"""Returns the contents of the given library file"""
param_def = [
['filename', Type.Filename, None, 'Absolute path and file name or '\
'simple filename. Relative paths are not allowed.']
]
result_def = [
['filedata', Type.File, None, 'The file data object']
]
hints = { 'commit_cmd' : 'commit_ctrllib' }
def run(self,filename):
pool = self.getManager().getPool()
data = pool.GetFile(filename)
['final_pos2', Type.Float, None, 'Scan final position 2'],
['motor3', Type.Moveable, None, 'Moveable 3 to move'],
['start_pos3', Type.Float, None, 'Scan start position 3'],
['final_pos3', Type.Float, None, 'Scan final position 3'],
['nr_interv', Type.Integer, None, 'Number of scan intervals'],
['integ_time', Type.Float, None, 'Integration time'],
['latency_time', Type.Float, 0, 'Latency time']]
def prepare(self, m1, s1, f1, m2, s2, f2, m3, s3, f3, nr_interv,
integ_time, latency_time, **opts):
self._prepare([m1, m2, m3], [s1, s2, s3], [f1, f2, f3], nr_interv,
integ_time, mode=ContinuousHwTimeMode,
latency_time=latency_time, **opts)
class a4scanct(aNscan, Macro):
"""Four-motor continuous scan.
a2scanct scans four motors, as specified by motor1, motor2, motor3 and
motor4. Each motor starts before the position given by its start_pos in
order to reach the constant velocity at its start_pos and finishes at the
position after its final_pos in order to maintain the constant velocity
until its final_pos."""
param_def = [
['motor1', Type.Moveable, None, 'Moveable 1 to move'],
['start_pos1', Type.Float, None, 'Scan start position 1'],
['final_pos1', Type.Float, None, 'Scan final position 1'],
['motor2', Type.Moveable, None, 'Moveable 2 to move'],
['start_pos2', Type.Float, None, 'Scan start position 2'],
['final_pos2', Type.Float, None, 'Scan final position 2'],
['motor3', Type.Moveable, None, 'Moveable 3 to move'],
['start_pos3', Type.Float, None, 'Scan start position 3'],
param_def = [['motor', Type.Moveable, None, 'Moveable name'],
['start_pos', Type.Float, None, 'Scan start position'],
['final_pos', Type.Float, None, 'Scan final position'],
['nr_interv', Type.Integer, None, 'Number of scan intervals'],
['integ_time', Type.Float, None, 'Integration time'],
['latency_time', Type.Float, 0, 'Latency time']]
def prepare(self, motor, start_pos, final_pos, nr_interv,
integ_time, latency_time, **opts):
self._prepare([motor], [start_pos], [final_pos], nr_interv,
integ_time, mode=ContinuousHwTimeMode,
latency_time=latency_time, **opts)
class d2scanct(dNscanct, Macro):
"""continuous two-motor scan relative to the starting positions,
d2scanct scans three motors, as specified by motor1 and motor2.
Each motor starts before the position given by its start_pos in order to
reach the constant velocity at its start_pos and finishes at the position
after its final_pos in order to maintain the constant velocity until its
final_pos.
"""
param_def = [
['motor1', Type.Moveable, None, 'Moveable 1 to move'],
['start_pos1', Type.Float, None, 'Scan start position 1'],
['final_pos1', Type.Float, None, 'Scan final position 1'],
['motor2', Type.Moveable, None, 'Moveable 2 to move'],
['start_pos2', Type.Float, None, 'Scan start position 2'],
['final_pos2', Type.Float, None, 'Scan final position 2'],
['integ_time', Type.Float, None, 'Integration time'],
['slow_down', Type.Float, 1, 'global scan slow down factor (0, 1]'],
def pauseMacro(self):
pauseTime = time.time()
self._currentTimer.cancel()
self._currentTimer = None
elapsedTime = pauseTime - self._startTime
self._currentDelay = self._currentDelay - elapsedTime
self._door.set_state(Macro.Pause)
self._door.push_change_event('state')
msg = "Macro %s just PAUSED" % self._currentMacro[0]
self._door.info(msg)