Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@app.route('/api/filestat', methods=['GET'])
@app.catch_err
def get_filestat():
schema = request.args.get('schema', '')
path = request.args.get('path', '')
_, filename = path.split(path)
path = get_obj_path(schema, path)
if not path:
return app.json_error(
HTTPStatus.NOT_FOUND, path=path,
err='illegal path prefix or schema')
try:
stat = get_object_stat(path)
except FileNotFoundError as e:
def get_files():
schema = request.args.get('schema', '')
path = request.args.get('path', '')
size = int(request.args.get('size', '0'))
offset = int(request.args.get('offset', '0'))
_, filename = path.split(path)
path = get_obj_path(schema, path)
if not path:
return app.json_error(
HTTPStatus.NOT_FOUND, path=path,
err='illegal path prefix or schema')
try:
body = get_object(path, size, offset)
except FileNotFoundError as e:
return app.json_error(HTTPStatus.NOT_FOUND, path=path, err=str(e))
if body is None:
return app.json_error(HTTPStatus.NOT_FOUND, path=path)
ctype, _ = mimetypes.guess_type(path)
if not ctype:
ctype = 'application/octet-stream'
return Response(
body, mimetype=ctype, headers={"x-suggested-filename": filename})
def update_run(project, uid):
try:
data = request.get_json(force=True)
except ValueError:
return app.json_error(HTTPStatus.BAD_REQUEST, reason='bad JSON body')
app.logger.debug(data)
iter = int(request.args.get('iter', '0'))
app.db.update_run(data, uid, project, iter=iter)
app.app.logger.info('update run: {}'.format(data))
return jsonify(ok=True)
def log_path(project, uid) -> Path:
return app.logs_dir / project / uid
@app.route('/api/runs', methods=['DELETE'])
@app.catch_err
def del_runs():
name = request.args.get('name', '')
project = request.args.get('project', '')
labels = request.args.getlist('label')
state = request.args.get('state', '')
days_ago = int(request.args.get('days_ago', '0'))
app.db.del_runs(name, project, labels, state, days_ago)
return jsonify(ok=True)
@app.route('/api/projects', methods=['GET'])
@app.catch_err
def list_projects():
full = strtobool(request.args.get('full', 'no'))
fn = db2dict if full else attrgetter('name')
projects = []
for p in app.db.list_projects():
if isinstance(p, str):
projects.append(p)
else:
projects.append(fn(p))
return jsonify(ok=True, projects=projects)
@app.route('/api//tag/', methods=['DELETE'])
@app.catch_err
def del_tag(project, name):
count = app.db.del_tag(project, name)
return jsonify(ok=True, project=project, name=name, count=count)