Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def show_collections(username):
user = User.query.filter_by(username=username).first_or_404()
page = request.args.get('page', 1, type=int)
per_page = current_app.config['ALBUMY_PHOTO_PER_PAGE']
pagination = Collect.query.with_parent(user).order_by(Collect.timestamp.desc()).paginate(page, per_page)
collects = pagination.items
return render_template('user/collections.html', user=user, pagination=pagination, collects=collects)
def adc_api_workflow_show(workflow_id):
"""
Show a single workflow
Return a dict of single workflow
"""
state_only = request.args.get('state_only', 'False').lower() not in {'false', '0'}
workflow_dict = adc.workflow_show(workflow_id, state_only=state_only)
return jsonify(workflow_dict)
def list(self):
"""
用户组列表
/group/
:return:
"""
page = int(request.args.get('page', 0))
page = page - 1 if page else 0
size = int(request.args.get('size', 10))
kw = request.values.get('kw', '')
space_model = SpaceModel()
space_list, count = space_model.list(page=page, size=size, kw=kw)
return self.list_json(list=space_list, count=count, enable_create=permission.role_upper_owner())
group_model, count = SpaceModel().query_paginate(page=page, limit=size, filter_name_dict=filter)
groups = []
for group_info in group_model:
group_sub = MemberModel.query \
.filter_by(group_id=group_info.id) \
.count()
group_info = group_info.to_json()
group_info['users'] = group_sub
def getFrontendContent(**params):
if 'area' not in params.keys() and request.args.get('area', '') != '':
params['area'] = request.args.get('area')
if 'area' in params.keys() and params['area'] in ['west', 'east']: # small area view
return render_template('frontend.locations_smallarea.html', cities=City.getCities(), alarmobjects=AlarmObject.getAlarmObjects(), alarmobjecttypes=AlarmObjectType.getAlarmObjectTypes(), frontendarea=params['area'])
return ""
def _get_print_url(event, theme=None, theme_override=False):
view = theme if theme_override else None
if event.type_ == EventType.conference:
return url_for(u'timetable.timetable', event, print=u'1', view=view)
elif event.type_ == EventType.meeting:
show_date = request.args.get(u'showDate')
show_session = request.args.get(u'showSession')
detail_level = request.args.get(u'detailLevel')
if show_date == u'all':
show_date = None
if show_session == u'all':
show_session = None
if detail_level in (u'all', u'contrinbution'):
detail_level = None
return url_for(u'events.display', event, showDate=show_date, showSession=show_session, detailLevel=detail_level,
print=u'1', view=view)
elif event.type_ == EventType.lecture:
return url_for(u'events.display', event, print=u'1', view=view)
@app.route('/oauth/callback')
def get_oauth_callback():
''' Handle Github's OAuth callback after a user authorizes.
http://developer.github.com/v3/oauth/#github-redirects-back-to-your-site
'''
if 'error' in request.args:
return render_template('error-oauth.html', reason="you didn't authorize access to your account.")
try:
code, state_id = request.args['code'], request.args['state']
except:
return render_template('error-oauth.html', reason='missing code or state in callback.')
try:
state = session['states'].pop(state_id)
except:
return render_template('error-oauth.html', reason='state "%s" not found?' % state_id)
#
# Exchange the temporary code for an access token:
# http://developer.github.com/v3/oauth/#parameters-1
#
data = dict(client_id=github_client_id, code=code, client_secret=github_client_secret)
resp = post('https://github.com/login/oauth/access_token', urlencode(data),
headers={'Accept': 'application/json'})
auth = resp.json()
def authorized(remote_app=None):
"""Authorized handler callback."""
if remote_app not in handlers:
return abort(404)
state_token = request.args.get('state')
# Verify state parameter
try:
assert state_token
# Checks authenticity and integrity of state and decodes the value.
state = serializer.loads(state_token)
# Verify that state is for this session, app and that next parameter
# have not been modified.
assert state['sid'] == session.sid
assert state['app'] == remote_app
# Store next URL
set_session_next_url(remote_app, state['next'])
except (AssertionError, BadData):
if current_app.config.get('OAUTHCLIENT_STATE_ENABLED', True) or (
not(current_app.debug or current_app.testing)):
abort(403)
@app.route('/snowplow/', methods=["GET"])
def track_event(path):
# TODO: put in spam prevention
app_id = request.args.get("aid")
signal = request.args
if app_id == "searchHub" and signal:
coll_id = app.config.get("FUSION_COLLECTION", "lucidfind")
result = backend.send_signal(coll_id, signal)
else:
print "Unable to send signal: app_id: {0}, signal: {1}".format(app_id, signal)
#Snowplow requires you respond with a 1x1 pixel
return send_from_directory(os.path.join(app.root_path, 'assets/img/'), 'onebyone.png')
def index():
puns, word = None, None
if 'stats' in request.args:
logger.info(get_puns.cache_info())
if 'word' in request.args:
word = request.args.get('word').strip()
if len(word) < 1:
word = None
else:
puns = get_puns(word)
return render_template('index.html', puns=puns, word=word)
def get_dag_runs():
dag_runs = []
session = settings.Session()
query = session.query(DagRun)
if request.args.get('state') is not None:
query = query.filter(DagRun.state == request.args.get('state'))
if request.args.get('external_trigger') is not None:
# query = query.filter(DagRun.external_trigger == (request.args.get('external_trigger') is True))
query = query.filter(DagRun.external_trigger == (request.args.get('external_trigger') in ['true', 'True']))
if request.args.get('prefix') is not None:
query = query.filter(DagRun.run_id.ilike('{}%'.format(request.args.get('prefix'))))
runs = query.order_by(DagRun.execution_date).all()
for run in runs:
dag_runs.append(format_dag_run(run))
session.close()