Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
self.reloadMacro(name)
macro_library = self.getMacroLibrary(macro_library_name)
if macro_library.has_errors():
exc_info = macro_library.get_error()
#msg = "".join(traceback.format_exception(*exc_info))
msg = "".join(traceback.format_exception_only(*exc_info[:2]))
self.error(msg)
else:
self.output("%s successfully (re)loaded", name)
class sar_info(Macro):
"""Prints details about the given sardana object"""
param_def = [
['obj', Type.Object, None, 'obj']
]
def run(self, obj):
self.dump_properties(obj)
#self.output("")
#self.dump_attributes(obj)
def dump_properties(self, obj):
data = obj.serialize()
table = Table([data.values()], row_head_str=data.keys(),
row_head_fmt='%*s', col_sep=' = ')
self.output("Properties:")
self.output("-----------")
for line in table.genOutput():
self.output(line)
self._prepare([motor], [start_pos], [final_pos],
nr_interv, integ_time, **opts)
class d2scan(dNscan, Macro):
"""two-motor scan relative to the starting position.
d2scan scans two motors, as specified by motor1 and motor2.
Each motor moves the same number of intervals. If each motor is at a
position X before the scan begins, it will be scanned from X+start_posN
to X+final_posN (where N is one of 1,2).
The step size for each motor is (start_pos-final_pos)/nr_interv.
The number of data points collected will be nr_interv+1.
Count time is given by time which if positive, specifies seconds and
if negative, specifies monitor counts."""
param_def = [
['motor1', Type.Moveable, None, 'Moveable 1 to move'],
['start_pos1', Type.Float, None, 'Scan start position 1'],
['final_pos1', Type.Float, None, 'Scan final position 1'],
['motor2', Type.Moveable, None, 'Moveable 2 to move'],
['start_pos2', Type.Float, None, 'Scan start position 2'],
['final_pos2', Type.Float, None, 'Scan final position 2'],
['nr_interv', Type.Integer, None, 'Number of scan intervals'],
['integ_time', Type.Float, None, 'Integration time']
]
def prepare(self, motor1, start_pos1, final_pos1, motor2, start_pos2,
final_pos2, nr_interv, integ_time, **opts):
self._prepare([motor1, motor2], [start_pos1, start_pos2], [
final_pos1, final_pos2], nr_interv, integ_time, **opts)
class d3scan(dNscan, Macro):
data = array.array('B', data).tostring()
line_nb = 1
for line in data.splitlines():
line = line.strip(' \t')
if line.startswith('class') and line.find(ctrlclass.name) > 0 and \
line.endswith(":"):
break
line_nb = line_nb + 1
return [f_name, data, line_nb]
class edctrllib(Macro):
"""Returns the contents of the given library file"""
param_def = [
['filename', Type.Filename, None, 'Absolute path and file name or '
'simple filename. Relative paths are not allowed.']
]
result_def = [
['filedata', Type.File, None, 'The file data object']
]
hints = {'commit_cmd': 'commit_ctrllib'}
def run(self, filename):
pool = self.getManager().getPool()
data = pool.GetFile(filename)
return [filename, array.array('B', data).tostring(), 0]
class commit_ctrllib(Macro):
"""Show all motor positions in a pretty table"""
param_def = [
['filter',
ParamRepeat(['filter', Type.String, '.*', 'a regular expression filter'], min=0, max=1),
'.*', 'a regular expression filter'],
]
def run(self, *filter):
self.execMacro('wa', *filter, **Table.PrettyOpts)
class set_lim(Macro):
"""Sets the software limits on the specified motor hello"""
param_def = [
['motor', Type.Moveable, None, 'Motor name'],
['low', Type.Float, None, 'lower limit'],
['high', Type.Float, None, 'upper limit']
]
def run(self, motor, low, high):
name = motor.getName()
self.debug("Setting user limits for %s" % name)
motor.getPositionObj().setLimits(low,high)
self.output("%s limits set to %.4f %.4f (user units)" % (name, low, high))
class set_lm(Macro):
"""Sets the dial limits on the specified motor"""
param_def = [
['motor', Type.Motor, None, 'Motor name'],
['low', Type.Float, None, 'lower limit'],
['high', Type.Float, None, 'upper limit']
]
# it gives the "pre-acq" hint to the hook
loop_macro.hooks = [self.hook]
self.runMacro(loop_macro)
class hooked_scan(Macro):
"""An example on how to attach hooks to the various hook points of a scan.
This macro is part of the examples package. It was written for
demonstration purposes"""
param_def = [
['motor', Type.Moveable, None, 'Motor to move'],
['start_pos', Type.Float, None, 'Scan start position'],
['final_pos', Type.Float, None, 'Scan final position'],
['nr_interv', Type.Integer, None, 'Number of scan intervals'],
['integ_time', Type.Float, None, 'Integration time']
]
def hook1(self):
self.info("\thook1 execution")
def hook2(self):
self.info("\thook2 execution")
def hook3(self):
self.info("\thook3 execution")
def hook4(self):
self.info("\thook4 execution")
def hook5(self):
self.info("\thook5 execution")
self.output(line)
class lsm(_lsobj):
"""Lists all motors"""
type = Type.Moveable
class lspm(lsm):
"""Lists all existing motors"""
subtype = 'PseudoMotor'
class lscom(_lsobj):
"""Lists all communication channels"""
type = Type.ComChannel
class lsior(_lsobj):
"""Lists all IORegisters"""
type = Type.IORegister
class lsexp(_lsobj):
"""Lists all experiment channels"""
type = Type.ExpChannel
class lsct(lsexp):
"""Lists all Counter/Timers"""
subtype = 'CTExpChannel'
class hooked_scan_with_macro(Macro):
"""An example on how to attach macro (in this case without parameters)
as a hook to the various hook points of a scan.
This macro is part of the examples package. It was written for
demonstration purposes"""
param_def = [
['motor', Type.Moveable, None, 'Motor to move'],
['start_pos', Type.Float, None, 'Scan start position'],
['final_pos', Type.Float, None, 'Scan final position'],
['nr_interv', Type.Integer, None, 'Number of scan intervals'],
['integ_time', Type.Float, None, 'Integration time'],
['macro', Type.Macro, None, 'Macro to be used as hook'],
['hook_places', [['hook_place', Type.String, None, 'Hook place']],
None, 'Hook places where macro will be called']
]
def run(self, motor, start_pos, final_pos, nr_interv, integ_time, macro,
hook_places):
ascan, _ = self.createMacro(
"ascan", motor, start_pos, final_pos, nr_interv, integ_time)
macro_hook = ExecMacroHook(self, "umv", [["mot01", 1]])
ascan.hooks = [(macro_hook, hook_places)]
self.runMacro(ascan)
class hooked_dummyscan(Macro):
"""An example on how to attach hooks to the various hook points of a scan.
This macro is part of the examples package. It was written for
demonstration purposes"""
data = array.array('B',meta)
data.extend(array.array('B',filedata))
pool.PutFile(data.tolist())
################################################################################
#
# Macro handling related macros
#
################################################################################
class prdef(Macro):
"""Returns the the macro code for the given macro name."""
param_def = [
['macro_name', Type.MacroCode, None, 'macro name']
]
def run(self,macro_data):
code_lines, first_line = macro_data.code
for code_line in code_lines:
self.output(code_line.strip('\n'))
class rellib(Macro):
"""Reloads the given python library code from the macro server filesystem.
.. warning:: use with extreme care! Accidentally reloading a system
module or an installed python module may lead to unpredictable
behavior
def run(self, controller):
pool = controller.getPoolObj()
pool.deleteController(controller.getName())
################################################################################
#
# Controller related macros
#
################################################################################
class send2ctrl(Macro):
"""Sends the given data directly to the controller"""
param_def = [['controller', Type.Controller, None, 'Controller name'],
['data',
ParamRepeat(['string item', Type.String, None, 'a string item'],),
None, 'data to be sent']]
def run(self, controller, *data):
name = controller.getName()
pool = controller.getPoolObj()
str_data = " ".join(data)
res = pool.SendToController([name,str_data])
self.output(res)
################################################################################
#
# Library handling related macros
#
################################################################################
class lsctrl(_lsobj):
"""Lists all existing controllers"""
type = Type.Controller
cols = 'Name', ('Type', 'main_type'), ('Class', 'klass'), 'Module'
class lsi(_lsobj):
"""Lists all existing instruments"""
type = Type.Instrument
cols = 'Name', 'Type', ('Parent', 'parent_instrument')
class lsa(_lsobj):
"""Lists all existing objects"""
type = Type.Moveable, Type.ComChannel, Type.ExpChannel, Type.IORegister,\
Type.TriggerGate
class lsmeas(_lsobj):
"""List existing measurement groups"""
type = Type.MeasurementGroup
cols = 'Active', 'Name', 'Timer', 'Experim. channels'
width = -1, -1, -1, 60
align = HCenter, Right, Right, Left
def prepare(self, filter, **opts):
try:
self.mnt_grp = self.getEnv('ActiveMntGrp').lower() or None
except: