Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@check_offline
def _register_wait(self):
atexit.register(self._wait) @check_offline
def end(self, status, message=None, traceback=None):
if self.status in ["succeeded", "failed", "stopped"]:
return
self.log_status(status=status, reason=message, message=traceback)
self.status = status
time.sleep(
0.1
) # Just to give the opportunity to the worker to pick the message @check_no_op
@check_offline
def _end(self):
self.log_succeeded() @check_offline
def log_inputs(self, reset=False, **inputs):
patch_dict = {"inputs": inputs}
if reset is False:
patch_dict["merge"] = True
self.data.inputs = inputs
else:
self.data.inputs.update(inputs)
self._update(patch_dict) @check_no_op
@check_offline
def log_failed(self, message=None, traceback=None):
self.end(status="failed", message=message, traceback=traceback) @check_offline
def log_run_env(self):
self._update({"run_env": get_run_env()}) @check_no_op
@check_offline
def set_name(self, name):
self._update({"name": name}) @check_no_op
@check_offline
def refresh_data(self):
self._run = self.client.runs_v1.get_run(self.owner, self.project, self.run_uuid)
for status, conditions in get_run_statuses(
self.owner, self.project, self.run_uuid
):
self.status = status