Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _run_dose_calculation(data, cfg_dir):
if not feature_config.cfg().rs4pi_dose_calc:
dicom_dose = _run_dose_calculation_fake(data, cfg_dir)
else:
with pkio.save_chdir(cfg_dir):
pksubprocess.check_call_with_signals(['bash', str(cfg_dir.join(template.DOSE_CALC_SH))])
dicom_dose = template.generate_rtdose_file(data, cfg_dir)
data['models']['dicomDose'] = dicom_dose
# save results into simulation input data file, this is needed for further calls to get_simulation_frame()
simulation_db.write_json(template_common.INPUT_BASE_NAME, data)
simulation_db.write_result({
'dicomDose': dicom_dose,
})
def _run_hellweg(cfg_dir):
with pkio.save_chdir(cfg_dir):
exec(pkio.read_text(template_common.PARAMETERS_PYTHON_FILE), locals(), locals())
pkio.write_text(template.HELLWEG_INPUT_FILE, input_file)
pkio.write_text(template.HELLWEG_INI_FILE, ini_file)
s = solver.BeamSolver(template.HELLWEG_INI_FILE, template.HELLWEG_INPUT_FILE)
s.solve()
s.save_output(template.HELLWEG_SUMMARY_FILE)
s.dump_bin(template.HELLWEG_DUMP_FILE)
def run_background(cfg_dir):
with pkio.save_chdir(cfg_dir):
data = simulation_db.read_json(template_common.INPUT_BASE_NAME)
res = {}
if data.report == 'epicsServerAnimation':
epics_settings = data.models.epicsServerAnimation
if epics_settings.serverType == 'local':
server_address = _run_epics(data)
epics_settings.serverAddress = server_address
else:
assert epics_settings.serverAddress, 'missing remote server address'
server_address = epics_settings.serverAddress
_run_epics_monitor(server_address)
if epics_settings.serverType == 'local':
res['error'] = _run_simulation_loop(server_address)
else:
res['error'] = _run_forever(server_address)
else:
def run_background(cfg_dir, sbatch=False):
"""Run code in ``cfg_dir`` with mpi
Args:
cfg_dir (str): directory to run code in
"""
with pkio.save_chdir(cfg_dir):
mpi.run_script(_script())
simulation_db.write_result({})
def nginx_proxy():
"""Starts nginx in container.
Used for development only.
"""
assert pkconfig.channel_in('dev')
run_dir = _run_dir().join('nginx_proxy').ensure(dir=True)
with pkio.save_chdir(run_dir):
f = run_dir.join('default.conf')
values = dict(pkcollections.map_items(cfg))
pkjinja.render_resource('nginx_proxy.conf', values, output=f)
cmd = [
'docker',
'run',
'--net=host',
'--rm',
'--volume={}:/etc/nginx/conf.d/default.conf'.format(f),
'nginx',
]
pksubprocess.check_call_with_signals(cmd)
def run(cfg_dir):
"""Run srw in ``cfg_dir``
Args:
cfg_dir (str): directory to run srw in
"""
with pkio.save_chdir(cfg_dir):
_run_srw()
def flower():
"""Start flower"""
assert pkconfig.channel_in('dev')
run_dir = _run_dir().join('flower').ensure(dir=True)
with pkio.save_chdir(run_dir):
command.FlowerCommand().execute_from_commandline([
'flower',
'--address=' + cfg.ip,
'--app=sirepo.celery_tasks',
'--no-color',
'--persistent',
])