Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if refresh and self._is_remote_api():
try:
meta = self.metadata
db_func = db.get_function(meta.name, meta.project, meta.tag)
if db_func and 'status' in db_func:
self.status = db_func['status']
if self.status.state and self.status.state == 'ready':
self.spec.image = get_in(db_func, 'spec.image', self.spec.image)
except Exception:
pass
tag = tag or self.metadata.tag
obj = self.to_dict()
logger.debug('saving function: {}, tag: {}'.format(self.metadata.name, tag))
hash_key = db.store_function(
obj, self.metadata.name, self.metadata.project, tag, versioned
)
return hash_key
def cancel_periodic_functions():
logger.debug('Canceling periodic functions')
global tasks
for task in tasks:
task.cancel()
def _run(self, runobj: RunObject, execution):
environ['MLRUN_EXEC_CONFIG'] = runobj.to_json()
tmp = mktemp('.json')
environ['MLRUN_META_TMPFILE'] = tmp
if self.spec.rundb:
environ['MLRUN_DBPATH'] = self.spec.rundb
handler = runobj.spec.handler
logger.debug(
'starting local run: {} # {}'.format(self.spec.command, handler or 'main')
)
if handler:
if self.spec.pythonpath:
set_paths(self.spec.pythonpath)
mod, fn = load_module(self.spec.command, handler)
context = MLClientCtx.from_dict(
runobj.to_dict(),
rundb=self.spec.rundb,
autocommit=False,
tmp=tmp,
host=socket.gethostname(),
)
mod.global_mlrun_context = context
def _cleanup_runtimes():
logger.debug('Cleaning runtimes')
db_session = create_session()
try:
for kind in RuntimeKinds.runtime_with_handlers():
runtime_handler = get_runtime_handler(kind)
runtime_handler.delete_resources(get_db(), db_session)
finally:
close_session(db_session)
if producer.kind == 'run':
db_key = producer.name + '_' + key
else:
db_key = key
item.db_key = db_key if db_key else ''
item.before_log()
self.artifacts[key] = item
if (upload is None and item.kind != 'dir') or upload:
item.upload(self.data_stores)
if db_key:
self._log_to_db(db_key, producer.project, producer.inputs, item, tag)
size = str(item.size) or '?'
logger.debug(
'log artifact {} at {}, size: {}, db: {}'.format(
key,
item.target_path,
size,
'Y' if (self.artifact_db and db_key) else 'N',
)
)
return item
get_pipeline_if_completed,
run_id,
namespace=namespace,
)
else:
client = Client(namespace=namespace)
resp = client.wait_for_run_completion(run_id, timeout)
if resp:
resp = resp.to_dict()
status = resp['run']['status'] if resp else 'unknown'
if expected_statuses:
if status not in expected_statuses:
raise RuntimeError(f"run status {status} not in expected statuses")
logger.debug(
f"Finished waiting for pipeline completion."
f" run_id: {run_id},"
f" status: {status},"
f" namespace: {namespace}"
)
return resp
async def update_run(
request: Request,
project: str,
uid: str,
iter: int = 0,
db_session: Session = Depends(deps.get_db_session)):
data = None
try:
data = await request.json()
except ValueError:
log_and_raise(HTTPStatus.BAD_REQUEST, reason="bad JSON body")
logger.debug(data)
await run_in_threadpool(get_db().update_run, db_session, data, uid, project, iter=iter)
logger.info("update run: {}".format(data))
return {}
log_mtime = crud.Logs.get_log_mtime(project, uid)
log_mtime_datetime = datetime.fromtimestamp(log_mtime, timezone.utc)
now = datetime.now(timezone.utc)
run = db.read_run(db_session, uid, project)
last_update_str = run.get('status', {}).get('last_update', now)
last_update = datetime.fromisoformat(last_update_str)
# this function is used to verify that logs collected from runtime resources before deleting them
# here we're using the knowledge that the function is called only after a it was verified that the runtime
# resource run is not in transient state, so we're assuming the run's last update is the last one, so if the
# log file was modified after it, we're considering it as all logs collected
if log_mtime_datetime < last_update:
store_log = True
if store_log:
logger.debug('Storing runtime resource log before deletion')
logs_from_k8s, _ = crud.Logs.get_log(
db_session, project, uid, source=LogSources.K8S
)
crud.Logs.store_log(logs_from_k8s, project, uid, append=False)
async def store_artifact(
request: Request,
project: str,
uid: str,
key: str,
tag: str = "",
iter: int = 0,
db_session: Session = Depends(deps.get_db_session),
):
data = None
try:
data = await request.json()
except ValueError:
log_and_raise(HTTPStatus.BAD_REQUEST, reason="bad JSON body")
logger.debug(data)
await run_in_threadpool(
get_db().store_artifact,
db_session,
key,
data,
uid,
iter=iter,
tag=tag,
project=project,
)
return {}