Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def oauth_token():
"""
OAuth2 server -- token endpoint (confidential clients only)
"""
# Always required parameters
grant_type = request.form.get('grant_type')
auth_client = current_auth.auth_client # Provided by @requires_client_login
scope = request.form.get('scope', '').split(' ')
# if grant_type == 'authorization_code' (POST)
code = request.form.get('code')
redirect_uri = request.form.get('redirect_uri')
# if grant_type == 'password' (POST)
username = request.form.get('username')
password = request.form.get('password')
# if grant_type == 'client_credentials'
buid = request.form.get('buid') or request.form.get(
'userid'
) # XXX: Deprecated userid parameter
# Validations 1: Required parameters
if not grant_type:
return oauth_token_error('invalid_request', _("Missing grant_type"))
# grant_type == 'refresh_token' is not supported. All tokens are permanent unless revoked
if grant_type not in ['authorization_code', 'client_credentials', 'password']:
return oauth_token_error('unsupported_grant_type')
# Validations 2: client scope
if grant_type == 'client_credentials':
# AuthClient data; user isn't part of it OR trusted client and automatic scope
def route_create_ensemble_workflow(ensemble):
dao = Ensembles(g.session)
e = dao.get_ensemble(g.user.username, ensemble)
name = request.form.get("name", None)
if name is None:
raise EMError("Specify ensemble workflow 'name'")
priority = request.form.get("priority", 0)
basedir = request.form.get("basedir")
if basedir is None:
raise EMError("Specify 'basedir' where plan command should be executed")
plan_command = request.form.get("plan_command")
if plan_command is None:
raise EMError("Specify 'plan_command' that should be executed to plan workflow")
dao.create_ensemble_workflow(e.id, name, basedir, priority, plan_command)
g.session.commit()
return api.json_created(url_for("route_get_ensemble_workflow", ensemble=ensemble, workflow=name))
def clear(self):
dag_id = request.form.get('dag_id')
task_id = request.form.get('task_id')
origin = request.form.get('origin')
dag = dagbag.get_dag(dag_id)
execution_date = request.form.get('execution_date')
execution_date = timezone.parse(execution_date)
confirmed = request.form.get('confirmed') == "true"
upstream = request.form.get('upstream') == "true"
downstream = request.form.get('downstream') == "true"
future = request.form.get('future') == "true"
past = request.form.get('past') == "true"
recursive = request.form.get('recursive') == "true"
only_failed = request.form.get('only_failed') == "true"
dag = dag.sub_dag(
task_regex=r"^{0}$".format(task_id),
include_downstream=downstream,
include_upstream=upstream)
end_date = execution_date if not future else None
start_date = execution_date if not past else None
return self._clear_dag_tis(dag, start_date, end_date, origin,
recursive=recursive, confirmed=confirmed, only_failed=only_failed)
def upload_file():
return str(len(request.form.get('pippo', 'boh'))) + request.form.get('pippo', 'boh')
return request.files['ufile'].read()
def create():
if request.method == 'POST': # if the method is 'POST' we know the form sent us to this page, so there is data to process.
title = request.form.get('title')
content = request.form.get('content')
post_id = len(posts)
posts[post_id] = {'id': post_id, 'title': title, 'content': content}
return redirect(url_for('post', post_id=post_id))
# if we are here it means we did not redirect to the post; so we must've loaded this using a GET.
# the browser always sends GET, so this means the user wants to load the page (as opposed to giving us data via the form).
# We can render the form.
return render_template('create.jinja2')
def register():
if request.method=='GET':
return render_template('register.html')
else:
stu_id = request.form.get('stu_id')
user = request.form.get('user')
password = request.form.get('password')
password1 = request.form.get('password1')
print(stu_id, user, password, password1)
if(password1 != password):
return u'两次输入密码不同,请检查'
else:
sql = "select * from STUDENT where STU_NO = '%s'" % stu_id
print(sql)
result = query.query(sql)
print(result)
if len(result) != 0:
return u'已经有这个用户了'
else:
sql = "INSERT INTO STUDENT VALUES('%s', 'SEX', '%s', 'COLLEGE', 'MAJOR', 'AD_YEAR', '%s', 'ID')" % (user, stu_id, password)
print(sql)
query.update(sql)
def __get_input_model_for_tx() -> AccountTransactionsInputModel:
""" Parse user input or create a blank input model """
model = AccountTransactionsInputModel()
if request.args:
# model.account_id = request.args.get('account')
model.period = request.args.get('period')
if request.form:
# read from request
model.account_id = request.form.get('account')
model.period = request.form.get('period')
return model
def home():
text = request.form.get('text', '') or request.args.get('text', '')
return render_template(
'home.html',
languages=sorted(
UI_LANGUAGES_MAP.items(),
key=lambda key_val: key_val[1],
),
text=text,
extra_html_classes='start-on-paste' if (text or 'text' in request.args) else '',
icon_extensions=ICON_EXTENSIONS,
max_upload_size=app.config['MAX_UPLOAD_SIZE'],
)
if request.method == 'GET':
invoice = getInvoice(session.book, id)
if invoice is None:
abort(404)
else:
return Response(json.dumps(invoice), mimetype='application/json')
elif request.method == 'POST':
customer_id = str(request.form.get('customer_id', ''))
currency = str(request.form.get('currency', ''))
date_opened = request.form.get('date_opened', None)
notes = str(request.form.get('notes', ''))
posted = request.form.get('posted', None)
posted_account_guid = str(request.form.get('posted_account_guid', ''))
posted_date = request.form.get('posted_date', '')
due_date = request.form.get('due_date', '')
posted_memo = str(request.form.get('posted_memo', ''))
posted_accumulatesplits = request.form.get('posted_accumulatesplits',
'')
posted_autopay = request.form.get('posted_autopay', '')
if posted == '1':
posted = 1
else:
posted = 0
if (posted_accumulatesplits == '1'
or posted_accumulatesplits == 'true'
def create_mod():
if not current_user:
return { 'error': True, 'reason': 'You are not logged in.' }, 401
if not current_user.public:
return { 'error': True, 'reason': 'Only users with public profiles may create mods.' }, 403
name = request.form.get('name')
short_description = request.form.get('short-description')
version = request.form.get('version')
ksp_version = request.form.get('ksp-version')
license = request.form.get('license')
ckan = request.form.get('ckan')
zipball = request.files.get('zipball')
# Validate
if not name \
or not short_description \
or not version \
or not ksp_version \
or not license \
or not zipball:
return { 'error': True, 'reason': 'All fields are required.' }, 400
# Validation, continued
if len(name) > 100 \
or len(short_description) > 1000 \
or len(license) > 128: