Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def list_artifacts(
project: str = config.default_project,
name: str = None,
tag: str = None,
labels: List[str] = Query([], alias='label'),
db_session: Session = Depends(deps.get_db_session),
):
artifacts = get_db().list_artifacts(db_session, name, project, tag, labels)
return {
"artifacts": artifacts,
}
def _tag_objects(db_session, data, project, name):
objs = []
for typ, query in data.items():
cls = table2cls(typ)
if cls is None:
err = f"unknown type - {typ}"
log_and_raise(HTTPStatus.BAD_REQUEST, reason=err)
# {"name": "bugs"} -> [Function.name=="bugs"]
db_query = [getattr(cls, key) == value for key, value in query.items()]
# TODO: Change _query to query?
# TODO: Not happy about exposing db internals to API
objs.extend(db_session.query(cls).filter(*db_query))
get_db().tag_objects(db_session, objs, project, name)
return objs
def get_run_db_instance(db_session: Session):
db = get_db()
if isinstance(db, SQLDB):
run_db = SQLRunDB(db.dsn, db_session, db.get_projects_cache())
else:
run_db = db.db
run_db.connect()
return run_db
def delete_runtime(
kind: str,
label_selector: str = None,
force: bool = False,
db_session: Session = Depends(deps.get_db_session),
):
if kind not in RuntimeKinds.runtime_with_handlers():
log_and_raise(
status.HTTP_400_BAD_REQUEST, kind=kind, err='Invalid runtime kind'
)
runtime_handler = get_runtime_handler(kind)
runtime_handler.delete_resources(get_db(), db_session, label_selector, force)
return Response(status_code=status.HTTP_204_NO_CONTENT)
reason="bad JSON, need to include function/url and task objects",
)
# TODO: block exec for function["kind"] in ["", "local] (must be a
# remote/container runtime)
response = None
try:
if function and not url:
fn = new_function(runtime=function)
else:
if "://" in url:
fn = import_function(url=url)
else:
project, name, tag, hash_key = parse_function_uri(url)
runtime = get_db().get_function(
db_session, name, project, tag, hash_key
)
if not runtime:
log_and_raise(
HTTPStatus.BAD_REQUEST,
reason="runtime error: function {} not found".format(url),
)
fn = new_function(runtime=runtime)
if function:
fn2 = new_function(runtime=function)
for attr in [
"volumes",
"volume_mounts",
"env",
"resources",
"image_pull_policy",
"replicas",
]:
val = getattr(fn2.spec, attr, None)
if val:
setattr(fn.spec, attr, val)
run_db = get_run_db_instance(db_session)
fn.set_db_connection(run_db, True)
logger.info("func:\n{}".format(fn.to_yaml()))
# fn.spec.rundb = "http://mlrun-api:8080"
schedule = data.get("schedule")
if schedule:
args = (task,)
job_id = get_scheduler().add(schedule, fn, args)
get_db().store_schedule(db_session, data)
response = {"schedule": schedule, "id": job_id}
else:
run = fn.run(task, watch=False)
if run:
response = run.to_dict()
except Exception as err:
logger.error(traceback.format_exc())
log_and_raise(HTTPStatus.BAD_REQUEST, reason="runtime error: {}".format(err))
logger.info("response: %s", response)
return {
"data": response,
}
def get_function(
project: str,
name: str,
tag: str = "",
hash_key="",
db_session: Session = Depends(deps.get_db_session)):
func = get_db().get_function(db_session, name, project, tag, hash_key)
return {
"func": func,
}
def get_project(name: str, db_session: Session = Depends(deps.get_db_session)):
project = get_db().get_project(db_session, name)
if not project:
log_and_raise(error=f"project {name!r} not found")
project.users = [u.name for u in project.users]
return {
"project": project,
}
def add_project(
project: schemas.ProjectCreate, db_session: Session = Depends(deps.get_db_session)
):
project_id = get_db().add_project(db_session, project.dict())
return {
"id": project_id,
"name": project.name,
}
def del_runs(
project: str = None,
name: str = None,
labels: List[str] = Query([], alias='label'),
state: str = None,
days_ago: int = 0,
db_session: Session = Depends(deps.get_db_session)):
get_db().del_runs(db_session, name, project, labels, state, days_ago)
return {}