Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@check_offline
def _register_wait(self):
atexit.register(self._wait)
@check_offline
def end(self, status, message=None, traceback=None):
if self.status in ["succeeded", "failed", "stopped"]:
return
self.log_status(status=status, reason=message, message=traceback)
self.status = status
time.sleep(
0.1
) # Just to give the opportunity to the worker to pick the message
@check_no_op
@check_offline
def _end(self):
self.log_succeeded()
@check_offline
def log_inputs(self, reset=False, **inputs):
patch_dict = {"inputs": inputs}
if reset is False:
patch_dict["merge"] = True
self.data.inputs = inputs
else:
self.data.inputs.update(inputs)
self._update(patch_dict)
@check_no_op
@check_offline
def log_failed(self, message=None, traceback=None):
self.end(status="failed", message=message, traceback=traceback)
@check_offline
def log_run_env(self):
self._update({"run_env": get_run_env()})
@check_no_op
@check_offline
def set_name(self, name):
self._update({"name": name})
@check_no_op
@check_offline
def refresh_data(self):
self._run = self.client.runs_v1.get_run(self.owner, self.project, self.run_uuid)
for status, conditions in get_run_statuses(
self.owner, self.project, self.run_uuid
):
self.status = status