Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def send(self, op):
#TODO(robnagler) need to send a retry to the ops, which should requeue
# themselves at an outer level(?).
# If a job is still running, but we just lost the websocket, want to
# pickup where we left off. If the op already was written, then you
# have to ask the agent. If ops are idempotent, we can simply
# resend the request. If it is in process, then it will be reconnected
# to the job. If it was already completed (and reply on the way), then
# we can cache that state in the agent(?) and have it send the response
# twice(?).
self.ops_pending_send.append(op)
self.run_scheduler(self)
await op.send_ready.wait()
if op.opId in self.ops_pending_done:
self.websocket.write_message(pkjson.dump_bytes(op.msg))
else:
pkdlog('canceled op={}', job.LogFormatter(op))
assert op not in self.ops_pending_send
def _http_send(body, write):
try:
write(pkjson.dump_bytes(body))
except Exception as e:
pkdlog('Error while writing to server. Casued by: {}'.format(e))
pkdlog(pkdexc())
async def _http_send(response_body, send):
pkdp(f'Sending to agent: {response_body}')
await send({
'type': 'http.response.start',
'status': 200,
'headers': [
(b'content-type', b'application/json'),
# (b'content-length', b'500'), # TODO(e-carlin): Calculate this
],
})
await send({
'type': 'http.response.body',
'body': pkjson.dump_bytes(response_body),
})
return m.group()
f = f.f_back
else:
raise AssertionError(
'{}: max frame search depth reached'.format(f.f_code)
)
k = PKDict(kwargs)
u = k.pkdel('_request_uri') or sirepo.job.SERVER_ABS_URI
c = k.pkdel('_request_content') or _request_content(k)
c.pkupdate(
api=get_api_name(),
serverSecret=sirepo.job.cfg.server_secret,
)
r = requests.post(
u,
data=pkjson.dump_bytes(c),
headers=PKDict({'Content-type': 'application/json'}),
verify=sirepo.job.cfg.verify_tls,
)
r.raise_for_status()
return pkjson.load_any(r.content)
def format_op(self, msg, opName, **kwargs):
if msg:
kwargs['opId'] = msg.get('opId')
return pkjson.dump_bytes(
PKDict(agentId=cfg.agent_id, opName=opName, **kwargs),
)
with _catch_and_log_errors(Exception, 'error handling request'):
request_bytes = bytearray()
while True:
chunk = await stream.receive_some(_CHUNK_SIZE)
if not chunk:
break
request_bytes += chunk
request = pkjson.load_any(request_bytes)
if 'run_dir' in request:
request.run_dir = pkio.py_path(request.run_dir)
pkdc('runner request: {!r}', request)
handler = _RPC_HANDLERS[request.action]
async with job_tracker.locks[request.run_dir]:
response = await handler(job_tracker, request)
pkdc('runner response: {!r}', response)
response_bytes = pkjson.dump_bytes(response)
await stream.send_all(response_bytes)