Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"""Reads `in_file` passes to `msg.job_process_cmd`
Must be called in run_dir
Writes its output on stdout.
Args:
in_file (str): json parsed to msg
Returns:
str: json output of command, e.g. status msg
"""
f = pkio.py_path(in_file)
msg = pkjson.load_any(f)
msg.run_dir = pkio.py_path(msg.run_dir) # TODO(e-carlin): find common place to serialize/deserialize paths
f.remove()
return pkjson.dump_pretty(
globals()['_do_' + msg.job_process_cmd](
msg,
sirepo.template.import_module(msg.sim_type),
),
pretty=False,
)
async def run_extract_job(self, io_loop, run_dir, jhash, subcmd, arg):
pkdc('{} {}: {} {}', run_dir, jhash, subcmd, arg)
status = self.report_job_status(run_dir, jhash)
if status is runner_client.JobStatus.MISSING:
pkdlog('{} {}: report is missing; skipping extract job',
run_dir, jhash)
return {}
# figure out which backend and any backend-specific info
runner_info_file = run_dir.join(_RUNNER_INFO_BASENAME)
if runner_info_file.exists():
runner_info = pkjson.load_any(runner_info_file)
else:
# Legacy run_dir
runner_info = pkcollections.Dict(
version=1, backend='local', backend_info={},
)
assert runner_info.version == 1
# run the job
cmd = ['sirepo', 'extract', subcmd, arg]
result = await local_process.run_extract_job(
io_loop, run_dir, cmd, runner_info.backend_info,
)
if result.stderr:
pkdlog(
'got output on stderr ({} {}):\n{}',
def run_dir_status(self, run_dir):
"""Get the current status of whatever's happening in run_dir.
Returns:
Tuple of (jhash or None, status of that job)
"""
disk_in_path = run_dir.join('in.json')
disk_status_path = run_dir.join('status')
if disk_in_path.exists() and disk_status_path.exists():
# status should be recorded on disk XOR in memory
assert run_dir not in self.report_jobs
disk_in_text = pkio.read_text(disk_in_path)
disk_jhash = pkjson.load_any(disk_in_text).computeJobHash
disk_status = pkio.read_text(disk_status_path)
if disk_status == 'pending':
# We never write this, so it must be stale, in which case
# the job is no longer pending...
pkdlog(
'found "pending" status, treating as "error" ({})',
disk_status_path,
)
disk_status = runner_client.JobStatus.ERROR
return disk_jhash, runner_client.JobStatus(disk_status)
elif run_dir in self.report_jobs:
job_info = self.report_jobs[run_dir]
return job_info.jhash, job_info.status
else:
return None, runner_client.JobStatus.MISSING
async def _http_receive(receive):
body = b''
more_body = True
while more_body:
message = await receive()
body += message.get('body', b'')
more_body = message.get('more_body', False)
return pkjson.load_any(body)
async def on_stdout_read(self, text):
if self._terminating or not self.msg.opId:
return
try:
await self.dispatcher.send(
self.dispatcher.format_op(
self.msg,
job.OP_RUN if self._is_compute else job.OP_ANALYSIS,
reply=pkjson.load_any(text),
)
)
except Exception as exc:
pkdlog('text={} error={} stack={}', text, exc, pkdexc())
def run_extract_job(run_dir, jhash, subcmd, *args):
body = ({
'action': job.ACTION_RUN_EXTRACT_JOB,
'run_dir': str(run_dir),
'jhash': jhash,
'subcmd': subcmd,
'arg': pkjson.dump_pretty(args),
})
response = _request(body)
return response.result
def _write_status(status, run_dir):
fn = run_dir.join('result.json')
if not fn.exists():
pkjson.dump_pretty({'state': status.value}, filename=fn)
pkio.write_text(run_dir.join('status'), status.value)
async def _op(self, msg):
m = None
try:
m = pkjson.load_any(msg)
pkdlog('op={} opId={} shifterImage={}', m.opName, m.get('opId'), m.get('shifterImage'))
pkdc('m={}', job.LogFormatter(m))
return await getattr(self, '_op_' + m.opName)(m)
except Exception as e:
err = 'exception=' + str(e)
stack = pkdexc()
pkdlog(
'op={} exception={} stack={}',
m and m.get('opName'),
e,
stack,
)
return self.format_op(m, job.OP_ERROR, error=err, stack=stack)