Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
path = request.args.get('path', '')
size = int(request.args.get('size', '0'))
offset = int(request.args.get('offset', '0'))
_, filename = path.split(path)
path = get_obj_path(schema, path)
if not path:
return app.json_error(
HTTPStatus.NOT_FOUND, path=path,
err='illegal path prefix or schema')
try:
body = get_object(path, size, offset)
except FileNotFoundError as e:
return app.json_error(HTTPStatus.NOT_FOUND, path=path, err=str(e))
if body is None:
return app.json_error(HTTPStatus.NOT_FOUND, path=path)
ctype, _ = mimetypes.guess_type(path)
if not ctype:
ctype = 'application/octet-stream'
return Response(
body, mimetype=ctype, headers={"x-suggested-filename": filename})
def get_filestat():
schema = request.args.get('schema', '')
path = request.args.get('path', '')
_, filename = path.split(path)
path = get_obj_path(schema, path)
if not path:
return app.json_error(
HTTPStatus.NOT_FOUND, path=path,
err='illegal path prefix or schema')
try:
stat = get_object_stat(path)
except FileNotFoundError as e:
return app.json_error(HTTPStatus.NOT_FOUND, path=path, err=str(e))
ctype, _ = mimetypes.guess_type(path)
if not ctype:
ctype = 'application/octet-stream'
return jsonify(ok=True, size=stat.size,
modified=stat.modified,
mimetype=ctype)
def build_function():
try:
data = request.get_json(force=True)
except ValueError:
return app.json_error(HTTPStatus.BAD_REQUEST, reason='bad JSON body')
app.logger.info('build_function:\n{}'.format(data))
function = data.get('function')
with_mlrun = strtobool(data.get('with_mlrun', 'on'))
ready = False
try:
fn = new_function(runtime=function)
fn.set_db_connection(app.db)
fn.save(versioned=False)
ready = build_runtime(fn, with_mlrun)
fn.save(versioned=False)
app.logger.info('Fn:\n %s', fn.to_yaml())
except Exception as err:
app.logger.error(traceback.format_exc())
def store_artifact(project, uid, key):
try:
data = request.get_json(force=True)
except ValueError:
return app.json_error(HTTPStatus.BAD_REQUEST, reason='bad JSON body')
app.logger.debug(data)
tag = request.args.get('tag', '')
iter = int(request.args.get('iter', '0'))
app.db.store_artifact(key, data, uid, iter=iter, tag=tag, project=project)
return jsonify(ok=True)
def get_project(name):
project = app.db.get_project(name)
if not project:
return app.json_error(error=f'project {name!r} not found')
resp = {
'name': project.name,
'description': project.description,
'owner': project.owner,
'source': project.source,
'users': [u.name for u in project.users],
}
return jsonify(ok=True, project=resp)
def get_log(project, uid):
size = int(request.args.get('size', '-1'))
offset = int(request.args.get('offset', '0'))
out = b''
log_file = log_path(project, uid)
if log_file.exists():
with log_file.open('rb') as fp:
fp.seek(offset)
out = fp.read(size)
status = ''
else:
data = app.db.read_run(uid, project)
if not data:
return app.json_error(
HTTPStatus.NOT_FOUND, project=project, uid=uid)
status = get_in(data, 'status.state', '')
if app.k8s:
pods = app.k8s.get_logger_pods(uid)
if pods:
pod, new_status = list(pods.items())[0]
new_status = new_status.lower()
# TODO: handle in cron/tracking
if new_status != 'pending':
resp = app.k8s.logs(pod)
if resp:
out = resp.encode()[offset:]
if status == 'running':
now = now_date().isoformat()
def store_run(project, uid):
try:
data = request.get_json(force=True)
except ValueError:
return app.json_error(HTTPStatus.BAD_REQUEST, reason='bad JSON body')
app.logger.debug(data)
iter = int(request.args.get('iter', '0'))
app.db.store_run(data, uid, project, iter=iter)
app.app.logger.info('store run: {}'.format(data))
return jsonify(ok=True)
def start_function():
try:
data = request.get_json(force=True)
except ValueError:
return app.json_error(HTTPStatus.BAD_REQUEST, reason='bad JSON body')
app.logger.info('start_function:\n{}'.format(data))
url = data.get('functionUrl')
if not url:
return app.json_error(
HTTPStatus.BAD_REQUEST,
reason='runtime error: functionUrl not specified',
)
project, name, tag = parse_function_uri(url)
runtime = app.db.get_function(name, project, tag)
if not runtime:
return app.json_error(
HTTPStatus.BAD_REQUEST,
reason='runtime error: function {} not found'.format(url),
)
fn = new_function(runtime=runtime)
resource = runtime_resources_map.get(fn.kind)
if 'start' not in resource:
return app.json_error(
def tag_objects(project, name):
try:
data: dict = request.get_json(force=True)
except ValueError:
return app.json_error(HTTPStatus.BAD_REQUEST, reason='bad JSON body')
objs = []
for typ, query in data.items():
cls = table2cls(typ)
if cls is None:
err = f'unknown type - {typ}'
return app.json_error(HTTPStatus.BAD_REQUEST, reason=err)
# {'name': 'bugs'} -> [Function.name=='bugs']
db_query = [
getattr(cls, key) == value for key, value in query.items()
]
# TODO: Change _query to query?
# TODO: Not happy about exposing db internals to API
objs.extend(app.db.session.query(cls).filter(*db_query))
app.db.tag_objects(objs, project, name)
return jsonify(ok=True, project=project, name=name, count=len(objs))