Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def init():
global _DB_DIR, cfg, _NEXT_REQUEST_SECONDS
if _DB_DIR:
return
job.init()
job_driver.init()
_DB_DIR = sirepo.srdb.root().join(_DB_SUBDIR)
pykern.pkio.mkdir_parent(_DB_DIR)
cfg = pkconfig.init(
parallel=dict(
max_hours=(1, float, 'maximum run-time for parallel job (except sbatch)'),
),
sbatch_poll_secs=(60, int, 'how often to poll squeue and parallel status'),
sequential=dict(
max_hours=(.1, float, 'maximum run-time for sequential job'),
),
)
for k in job.KINDS:
_MAX_RUN_SECS[k] = int(cfg[k].max_hours * 3600)
_NEXT_REQUEST_SECONDS = PKDict({
job.PARALLEL: 2,
job.SBATCH: cfg.sbatch_poll_secs,
job.SEQUENTIAL: 1,
})
cfg = PKDict()
t = sim_types or CONFTEST_ALL_CODES
if t:
if isinstance(t, (tuple, list)):
t = ':'.join(t)
cfg['SIREPO_FEATURE_CONFIG_SIM_TYPES'] = t
if not (server and hasattr(app, a)):
from pykern import pkconfig
# initialize pkdebug with correct values
pkconfig.reset_state_for_testing(cfg)
from pykern import pkunit
with pkunit.save_chdir_work() as wd:
from pykern import pkio
cfg['SIREPO_SRDB_ROOT'] = str(pkio.mkdir_parent(wd.join('db')))
pkconfig.reset_state_for_testing(cfg)
from sirepo import server as s
server = s
app = server.init()
app.config['TESTING'] = True
app.test_client_class = _TestClient
setattr(app, a, app.test_client())
return getattr(app, a)
cfg = PKDict()
t = sim_types or CONFTEST_ALL_CODES
if t:
if isinstance(t, (tuple, list)):
t = ':'.join(t)
cfg['SIREPO_FEATURE_CONFIG_SIM_TYPES'] = t
if not (server and hasattr(app, a)):
from pykern import pkconfig
# initialize pkdebug with correct values
pkconfig.reset_state_for_testing(cfg)
from pykern import pkunit
with pkunit.save_chdir_work() as wd:
from pykern import pkio
cfg['SIREPO_SRDB_ROOT'] = str(pkio.mkdir_parent(wd.join('db')))
pkconfig.reset_state_for_testing(cfg)
from sirepo import server as s
server = s
app = server.init()
app.config['TESTING'] = True
app.test_client_class = _TestClient
setattr(app, a, app.test_client(job_run_mode=job_run_mode))
return getattr(app, a)
def copy_related_files(data, source_path, target_path):
# copy any simulation output
if os.path.isdir(str(py.path.local(source_path).join(_SIM_DATA.compute_model(data)))):
animation_dir = py.path.local(target_path).join(_SIM_DATA.compute_model(data))
pkio.mkdir_parent(str(animation_dir))
for f in glob.glob(str(py.path.local(source_path).join(_SIM_DATA.compute_model(data), '*'))):
py.path.local(f).copy(animation_dir)
def wait_for_job_completion(job_id):
s = 'pending'
while s in ('running', 'pending'):
o = subprocess.check_output(
('scontrol', 'show', 'job', job_id)
).decode('utf-8')
r = re.search(r'(?<=JobState=)(.*)(?= Reason)', o)
assert r, 'output={}'.format(s)
s = r.group().lower()
time.sleep(2) # TODO(e-carlin): cfg
assert s == 'completed', 'output={}'.format(o)
msg.runDir = pkio.py_path(msg.runDir)
with pkio.save_chdir('/'):
pkio.unchecked_remove(msg.runDir)
pkio.mkdir_parent(msg.runDir)
msg.simulationStatus = PKDict(
computeJobStart=int(time.time()),
state=job.RUNNING,
)
try:
a = _get_sbatch_script(
simulation_db.prepare_simulation(
msg.data,
run_dir=msg.runDir
)[0])
with open('slurmscript', 'w') as x:
x.write(a)
o, e = subprocess.Popen(
('sbatch'),
stdin=subprocess.PIPE,
def _random_id(parent_dir, simulation_type=None):
"""Create a random id in parent_dir
Args:
parent_dir (py.path): where id should be unique
Returns:
dict: id (str) and path (py.path)
"""
pkio.mkdir_parent(parent_dir)
r = random.SystemRandom()
# Generate cryptographically secure random string
for _ in range(5):
i = ''.join(r.choice(_ID_CHARS) for x in range(_ID_LEN))
if simulation_type:
if find_global_simulation(simulation_type, i):
continue
d = parent_dir.join(i)
try:
os.mkdir(str(d))
return PKDict(id=i, path=d)
except OSError as e:
if e.errno == errno.EEXIST:
pass
raise
raise RuntimeError('{}: failed to create unique directory'.format(parent_dir))
def init_apis(*args, **kwargs):
#TODO(robnagler) if we recover connections with agents and running jobs remove this
pykern.pkio.unchecked_remove(sirepo.job.LIB_FILE_ROOT, sirepo.job.DATA_FILE_ROOT)
pykern.pkio.mkdir_parent(sirepo.job.LIB_FILE_ROOT)
pykern.pkio.mkdir_parent(sirepo.job.DATA_FILE_ROOT)
assert os.path.isabs(v), \
'{}: SIREPO_SRDB_ROOT must be absolute'.format(v)
assert os.path.isdir(v), \
'{}: SIREPO_SRDB_ROOT must be a directory and exist'.format(v)
v = pkio.py_path(v)
else:
assert pkconfig.channel_in('dev'), \
'SIREPO_SRDB_ROOT must be configured except in DEV'
fn = sys.modules[pkinspect.root_package(_init_root)].__file__
root = pkio.py_path(pkio.py_path(pkio.py_path(fn).dirname).dirname)
# Check to see if we are in our dev directory. This is a hack,
# but should be reliable.
if not root.join('requirements.txt').check():
# Don't run from an install directory
root = pkio.py_path('.')
v = pkio.mkdir_parent(root.join(_DEFAULT_ROOT))
_root = v
return v