Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def run_dir_status(self, run_dir):
"""Get the current status of whatever's happening in run_dir.
Returns:
Tuple of (jhash or None, status of that job)
"""
disk_in_path = run_dir.join('in.json')
disk_status_path = run_dir.join('status')
if disk_in_path.exists() and disk_status_path.exists():
# status should be recorded on disk XOR in memory
assert run_dir not in self.report_jobs
disk_in_text = pkio.read_text(disk_in_path)
disk_jhash = pkjson.load_any(disk_in_text).reportParametersHash
disk_status = pkio.read_text(disk_status_path)
if disk_status == 'pending':
# We never write this, so it must be stale, in which case
# the job is no longer pending...
pkdlog(
'found "pending" status, treating as "error" ({})',
disk_status_path,
)
disk_status = runner_client.JobStatus.ERROR
return disk_jhash, runner_client.JobStatus(disk_status)
elif run_dir in self.report_jobs:
job_info = self.report_jobs[run_dir]
return job_info.jhash, job_info.status
return None, runner_client.JobStatus.MISSING
def _request(body):
#TODO(e-carlin): uid is used to identify the proper broker for the reuqest
# We likely need a better key and maybe we shouldn't expose this implementation
# detail to the client.
uid = simulation_db.uid_from_dir_name(body['run_dir'])
body['uid'] = uid
body['source'] = 'server'
body['rid'] = str(uuid.uuid4())
body.setdefault('resource_class', 'sequential')
r = requests.post(cfg.supervisor_http_uri, json=body)
return pkjson.load_any(r.content)
def run_dir_status(self, run_dir):
"""Get the current status of whatever's happening in run_dir.
Returns:
Tuple of (jhash or None, status of that job)
"""
disk_in_path = run_dir.join('in.json')
disk_status_path = run_dir.join('status')
if disk_in_path.exists() and disk_status_path.exists():
# status should be recorded on disk XOR in memory
assert run_dir not in self.report_jobs
disk_in_text = pkio.read_text(disk_in_path)
disk_jhash = pkjson.load_any(disk_in_text).reportParametersHash
disk_status = pkio.read_text(disk_status_path)
if disk_status == 'pending':
# We never write this, so it must be stale, in which case
# the job is no longer pending...
pkdlog(
'found "pending" status, treating as "error" ({})',
disk_status_path,
)
disk_status = runner_client.JobStatus.ERROR
return disk_jhash, runner_client.JobStatus(disk_status)
elif run_dir in self.report_jobs:
job_info = self.report_jobs[run_dir]
return job_info.jhash, job_info.status
#TODO(robnagler): else: is unnecessary so remove it for less code and clearer flow
else:
return None, runner_client.JobStatus.MISSING
def default_command(in_file):
"""Reads `in_file` passes to `msg.jobProcessCmd`
Must be called in run_dir
Writes its output on stdout.
Args:
in_file (str): json parsed to msg
Returns:
str: json output of command, e.g. status msg
"""
f = pkio.py_path(in_file)
msg = pkjson.load_any(f)
msg.runDir = pkio.py_path(msg.runDir) # TODO(e-carlin): find common place to serialize/deserialize paths
f.remove()
return pkjson.dump_pretty(
PKDict(globals()['_do_' + msg.jobProcessCmd](
msg,
sirepo.template.import_module(msg.simulationType),
)).pkupdate(opDone=True),
pretty=False,
)
)
k = PKDict(kwargs)
u = k.pkdel('_request_uri') or sirepo.job.SERVER_ABS_URI
c = k.pkdel('_request_content') or _request_content(k)
c.pkupdate(
api=get_api_name(),
serverSecret=sirepo.job.cfg.server_secret,
)
r = requests.post(
u,
data=pkjson.dump_bytes(c),
headers=PKDict({'Content-type': 'application/json'}),
verify=sirepo.job.cfg.verify_tls,
)
r.raise_for_status()
return pkjson.load_any(r.content)
async def post(self):
body = pkcollections.Dict(pkjson.load_any(self.request.body))
pkdlog(f'Received request: {body}')
source_types = ['server', 'driver']
assert body.source in source_types
broker = _create_broker_if_not_found(body.uid)
process_fn = getattr(broker, f'process_{body.source}_request')
await process_fn(body, self.write)
return
async def run_extract_job(self, run_dir, jhash, subcmd, arg):
pkdc('{} {}: {} {}', run_dir, jhash, subcmd, arg)
status = self.report_job_status(run_dir, jhash)
if status is runner_client.JobStatus.MISSING:
pkdlog('{} {}: report is missing; skipping extract job',
run_dir, jhash)
return {}
# figure out which backend and any backend-specific info
runner_info_file = run_dir.join(_RUNNER_INFO_BASENAME)
if runner_info_file.exists():
runner_info = pkjson.load_any(runner_info_file)
else:
# Legacy run_dir
runner_info = pkcollections.Dict(
version=1, backend='local', backend_info={},
)
assert runner_info.version == 1
# run the job
cmd = ['sirepo', 'extract', subcmd, arg]
result = await _BACKENDS[runner_info.backend].run_extract_job(
run_dir, cmd, runner_info.backend_info,
)
if result.stderr:
pkdlog(
'got output on stderr ({} {}):\n{}',