Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def get_user(uid):
method = request.args.get('method')
return jsonify(await _get_user(request, uid, method))
async def get(self):
return request.args['moo']
async def login():
scope = request.args.get(
'scope',
'identify guilds')
discord = oauth.make_session(scope=scope.split(' '))
authorization_url, state = discord.authorization_url(webapp.AUTHORIZATION_BASE_URL)
session['oauth2_state'] = state
return redirect(authorization_url)
{
"conf_name": "Oskars Testperson",
"conf_no": 14506
}
]
}
.. rubric:: Example
::
curl -v -X GET -H "Content-Type: application/json" \\
"http://localhost:5001/lyskom/conferences/?name=osk%20t&want-confs=false"
"""
name = request.args['name']
want_pers = get_bool_arg_with_default(request.args, 'want-pers', True)
want_confs = get_bool_arg_with_default(request.args, 'want-confs', True)
try:
lookup = await g.ksession.lookup_name(name, want_pers, want_confs)
confs = [ dict(conf_no=t[0], conf_name=t[1]) for t in lookup ]
return jsonify(dict(conferences=confs))
except komerror.Error as ex:
return error_response(400, kom_error=ex)
def _get_connection_id_from_request():
if HTTPKOM_CONNECTION_HEADER in request.headers:
return request.headers[HTTPKOM_CONNECTION_HEADER]
else:
# Work-around for allowing the connection id to be sent as
# query parameter. This is needed to be able to show images
# (text body) by creating an img tag.
# TODO: This should be considered unsafe, because it would
# expose the connection id if the user would copy the link.
return request.args.get(HTTPKOM_CONNECTION_HEADER, None)
return None
async def data(self, db_info, export=False):
limit = request.args.get('_size', ROWS_LIMIT) if not export else -1
rowid = not (request.args.get('_rowid') == 'hide') and not export
total = not (request.args.get('_total') == 'hide') and not export
sort = request.args.get('_sort')
sort_desc = request.args.get('_sort_desc')
offset = request.args.get('_offset') if not export else 0
# get filter arguments, like column__exact=xxx
filters = []
for key, value in request.args.items():
if not key.startswith('_') and '__' in key:
filters.append((key, value))
cols = 'rowid, *' if rowid else '*'
sql = 'SELECT {} FROM [{}]'.format(cols, db_info['table_name'])
sql, params = self.add_filters_to_sql(sql, filters)
if sort:
async def get_films():
minimal_year = request.args.get('year.gt', 2000)
films = {}
async with current_app.pool.acquire() as connection:
async with connection.transaction():
async for film in connection.cursor(
"""SELECT film_id, release_year, title
FROM film
WHERE release_year > $1""",
minimal_year,
):
films[film['film_id']] = {
'release_year': film['release_year'],
'title': film['title'],
}
return jsonify(films)
async def get(self):
app.logger.debug('* Starting ParseView.get')
url = request.args.get('url')
encoding = request.args.get('encoding')
if not url:
raise APIError('Missing url query string variable.', status=400)
if not validators.url(url):
raise APIError('Malformed url parameter.', status=400)
urlhash = get_hash(url)
if not already_exists(urlhash):
try:
storage = app.config['DB_ROOT_DIR']
await self.do_parse(url=url,
urlhash=urlhash,
encoding=encoding,
storage=storage,
logger=app.logger,
sniff_limit=app.config.get('CSV_SNIFF_LIMIT'),
"no_of_unread": null,
"unread_texts": null
},
...
]
}
.. rubric:: Example
::
curl -v -X GET "http://localhost:5001/lyskom/persons/14506/memberships/?unread=true"
"""
unread = get_bool_arg_with_default(request.args, 'unread', False)
passive = get_bool_arg_with_default(request.args, 'passive', False)
first = int(request.args.get('first', 0))
no_of_memberships = int(request.args.get('no-of-memberships', 100))
memberships, has_more = await g.ksession.get_memberships(
pers_no, first, no_of_memberships, unread, passive)
return jsonify(has_more=has_more, memberships=await to_dict(memberships, g.ksession))
async def get_task_result():
task_id = request.args.get("task_id")
if not task_id:
result = "Missing required argument `taskid`"
else:
if task_id not in tasks:
response = {"error": "invalid task_id"}
else:
status = tasks[task_id]["status"]
response = {"task_id": task_id, "status": status}
if "solution" in tasks[task_id]:
solution = tasks[task_id]["solution"]
response.update({"solution": solution})
result = json.dumps(response)
return Response(result, mimetype="text/json")