Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_dataitem(url, secrets=None, db=None):
"""get mlrun dataitem object (from path/url)"""
db = db or get_run_db().connect()
stores = StoreManager(secrets, db=db)
return stores.object(url=url)
self,
workflow_id,
timeout=60 * 60,
expected_statuses=None,
notifiers: RunNotifications = None,
):
status = ''
if timeout:
logger.info('waiting for pipeline run completion')
run_info = wait_for_pipeline_completion(
workflow_id, timeout=timeout, expected_statuses=expected_statuses
)
if run_info:
status = run_info['run'].get('status')
mldb = get_run_db().connect(self._secrets)
runs = mldb.list_runs(project=self.name, labels=f'workflow={workflow_id}')
had_errors = 0
for r in runs:
if r['status'].get('state', '') == 'error':
had_errors += 1
text = f'Workflow {workflow_id} finished'
if had_errors:
text += f' with {had_errors} errors'
if status:
text += f', status={status}'
if notifiers:
notifiers.push(text, runs)
return status, had_errors, text
def _get_artifact_mngr(self):
if self._artifact_mngr:
return self._artifact_mngr
db = get_run_db().connect(self._secrets)
sm = StoreManager(self._secrets, db)
self._artifact_mngr = ArtifactManager(sm, db)
return self._artifact_mngr
def state(self):
db = get_run_db().connect()
run = db.read_run(
uid=self.metadata.uid,
project=self.metadata.project,
iter=self.metadata.iteration,
)
if run:
return get_in(run, 'status.state', 'unknown')
and merge it with the user provided parameters
:param api_path: location/url of mlrun api service
:param artifact_path: path/url for storing experiment artifacts
:param project: default project name
:return actual artifact path/url, can be used to create subpaths per task,
e.g.: artifact_path = set_environment()
data_subpath = os.join(artifact_path, 'data')
"""
mlconf.dbpath = mlconf.dbpath or api_path
if not mlconf.dbpath:
raise ValueError('DB/API path was not detected, please specify its address')
# check connectivity and load remote defaults
get_run_db().connect()
if api_path:
environ['MLRUN_DBPATH'] = mlconf.dbpath
mlconf.default_project = project or mlconf.default_project
if not mlconf.artifact_path and not artifact_path:
raise ValueError('please specify a valid artifact_path')
if artifact_path.startswith('./'):
artifact_path = path.abspath(artifact_path)
elif not artifact_path.startswith('/') and '://' not in artifact_path:
raise ValueError('artifact_path must refer to an absolute path'
' or a valid url')
mlconf.artifact_path = artifact_path or mlconf.artifact_path
return mlconf.artifact_path
def logs(self, watch=True, db=None):
if not db:
db = get_run_db().connect()
if not db:
print('DB is not configured, cannot show logs')
return None
if db.kind == 'http':
state = db.watch_log(self.metadata.uid, self.metadata.project, watch=watch)
else:
state, text = db.get_log(self.metadata.uid, self.metadata.project)
if text:
print(text.decode())
if state:
print('final state: {}'.format(state))
return state
def logs(uid, project, offset, db, watch):
"""Get or watch task logs"""
mldb = get_run_db(db or mlconf.dbpath).connect()
if mldb.kind == 'http':
state = mldb.watch_log(uid, project, watch=watch, offset=offset)
else:
state, text = mldb.get_log(uid, project, offset=offset)
if text:
print(text.decode())
if state:
print('final state: {}'.format(state))
artifact_path.replace('{{run.uid}}', '{{workflow.uid}}')
if artifact_path and '{{run.project}}' in artifact_path:
if not project:
raise ValueError(
'project name must be specified with this'
+ f' artifact_path template {artifact_path}'
)
artifact_path.replace('{{run.project}}', project)
if not artifact_path:
raise ValueError('artifact path was not specified')
namespace = namespace or mlconf.namespace
arguments = arguments or {}
if remote or url:
mldb = get_run_db(url).connect()
if mldb.kind != 'http':
raise ValueError(
'run pipeline require access to remote api-service'
', please set the dbpath url'
)
id = mldb.submit_pipeline(
pipeline,
arguments,
experiment=experiment,
run=run,
namespace=namespace,
ops=ops,
artifact_path=artifact_path,
)
else:
def get_object(url, secrets=None, size=None, offset=0, db=None):
"""get mlrun dataitem body (from path/url)"""
db = db or get_run_db().connect()
stores = StoreManager(secrets, db=db)
return stores.object(url=url).get(size, offset)