Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
with pkio.open_text(filename) as f:
for line in f:
rows.append(line)
if len(rows) == 10:
if rows[4] == rows[7]:
# already fixed up
return
if re.search(r'^\#0 ', rows[4]):
rows[4] = rows[7]
rows[5] = rows[8]
rows[6] = rows[9]
else:
rows[7] = rows[4]
rows[8] = rows[5]
rows[9] = rows[6]
pkio.write_text(filename, ''.join(rows))
def _run_jspec(data):
_elegant_to_madx(data['models']['ring'])
exec(pkio.read_text(template_common.PARAMETERS_PYTHON_FILE), locals(), locals())
jspec_filename = template.JSPEC_INPUT_FILENAME
pkio.write_text(jspec_filename, jspec_file)
pksubprocess.check_call_with_signals(['jspec', jspec_filename], msg=pkdlog, output=template.JSPEC_LOG_FILE)
return pkio.read_text(template.JSPEC_LOG_FILE)
def _write_status(status, run_dir):
fn = run_dir.join('result.json')
if not fn.exists():
pkjson.dump_pretty({'state': status.value}, filename=fn)
pkio.write_text(run_dir.join('status'), status.value)
def _run_hellweg(cfg_dir):
with pkio.save_chdir(cfg_dir):
exec(pkio.read_text(template_common.PARAMETERS_PYTHON_FILE), locals(), locals())
pkio.write_text(template.HELLWEG_INPUT_FILE, input_file)
pkio.write_text(template.HELLWEG_INI_FILE, ini_file)
s = solver.BeamSolver(template.HELLWEG_INI_FILE, template.HELLWEG_INPUT_FILE)
s.solve()
s.save_output(template.HELLWEG_SUMMARY_FILE)
s.dump_bin(template.HELLWEG_DUMP_FILE)
def sbatch_script(path):
"""Write script to path
Args:
path (str): where to write file
"""
pkio.write_text(path, _script())
def _run_elegant(bunch_report=False, with_mpi=False):
exec(pkio.read_text(template_common.PARAMETERS_PYTHON_FILE), locals(), locals())
pkio.write_text('elegant.lte', lattice_file)
ele = 'elegant.ele'
pkio.write_text(ele, elegant_file)
kwargs = {
'output': ELEGANT_LOG_FILE,
'env': elegant_common.subprocess_env(),
}
try:
#TODO(robnagler) Need to handle this specially, b/c different binary
if execution_mode == 'parallel' and with_mpi and mpi.cfg.cores > 1:
mpi.run_program(['Pelegant', ele], **kwargs)
else:
pksubprocess.check_call_with_signals(['elegant', ele], msg=pkdlog, **kwargs)
except Exception as e:
# ignore elegant failures - errors will be parsed from the log
pass
def write_parameters(data, run_dir, is_parallel, python_file=template_common.PARAMETERS_PYTHON_FILE):
pkio.write_text(
run_dir.join(python_file),
_generate_parameters_file(data),
)
# unzip the required magnet files
for el in data.models.elements:
if el.type != 'TOSCA':
continue
filename = str(run_dir.join(_SIM_DATA.lib_file_name_with_model_field('TOSCA', 'magnetFile', el.magnetFile)))
if zgoubi_importer.is_zip_file(filename):
with zipfile.ZipFile(filename, 'r') as z:
for info in z.infolist():
if info.filename in el.fileNames:
z.extract(info, str(run_dir))
def _run_tunes_report(cfg_dir, data):
with pkio.save_chdir(cfg_dir):
exec(pkio.read_text(template_common.PARAMETERS_PYTHON_FILE), locals(), locals())
pkio.write_text(template.TUNES_INPUT_FILE, tunes_file)
#TODO(pjm): uses datafile from animation directory
os.symlink('../animation/zgoubi.fai', 'zgoubi.fai')
subprocess.call([_TUNES_PATH])
simulation_db.write_result(template.extract_tunes_report(cfg_dir, data))