Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def redirector(request):
raise web.HTTPFound(location=URL('/redirected'))
async def post(self):
data = dict({}, **await self.request.post())
data['created_time'] = str(time.time())
data['date'] = todate(data['created_time'], '%b.%d')
data['html'] = render(data['text'])
await self.redis.lpush('GuestBook', data, isdict=True, rem=True)
raise web.HTTPFound('/guest-book')
async def save_paste(request):
post_data = await request.post()
if post_data:
title = post_data.get('title')
body = post_data.get('body', '')
if title:
paste_obj = Paste(
uuid=str(uuid4()),
title=title,
body=body,
)
await paste_obj.save(request.app['db'])
# redirect to paste page
return web.HTTPFound('/pastes/{}'.format(paste_obj.uuid))
else:
# TODO: show error msg
pass
return {}
is_known_repository = self.is_known(repository_name)
if is_known_repository:
message = f"'{repository_name}' is already registered, look for it in the store."
return web.HTTPFound(
f"/hacsweb/{self.token}/settings?timestamp={time()}&message={message}"
)
if repository_name in self.common.blacklist:
self.common.blacklist.remove(repository_name)
await self.register_repository(repository_name, repository_type)
repository = self.get_by_name(repository_name)
if repository is not None:
return web.HTTPFound(
f"/hacsweb/{self.token}/repository/{repository.information.uid}?timestamp={time()}"
)
message = f"""
Could not add '{repository_name}' with type '{repository_type}' at this time.<br>
If you used the correct type, check the log for more details."""
return web.HTTPFound(
f"/hacsweb/{self.token}/settings?timestamp={time()}&message={message}"
)
def signout(request):
referer = request.headers.get('Referer')
r = web.HTTPFound(referer or '/')
# 清理掉cookie得用户信息数据
r.set_cookie(COOKIE_NAME, '-deleted-', max_age=0, httponly=True)
logging.info('user signed out')
return r
async def redirect_handler(self, request: web.BaseRequest):
"""Perform redirect to documentation."""
raise web.HTTPFound("/api/doc")
response = web.HTTPFound('/manage')
if config.admin['u2f'] and '_method' not in form:
users = await self.redis.get('Auth.U2F') or {}
users[identity], ok = await verify(users[identity], dict(await self.request.post()))
if ok:
await self.redis.set('Auth.U2F', users, many=False)
await remember(self.request, response, identity)
return response
elif form['_method'] == 'common':
method = check_method(form.get('email').lower())
# TODO:验证邮箱是否合法
if await check_credentials(self.redis, identity, form.get('password')):
await remember(self.request, response, identity)
return response
return web.HTTPFound('/auth/login')
async def login(request):
router = request.app.router
form = await request.post()
user_signature = (form['name'], form['password'])
# actually implement business logic to check user credentials
try:
user_id = DATABASE.index(user_signature)
# Always use `new_session` during login to guard against
# Session Fixation. See aiohttp-session#281
session = await new_session(request)
session['user_id'] = user_id
return web.HTTPFound(router['restricted'].url_for())
except ValueError:
return web.Response(text='No such user', status=HTTPStatus.FORBIDDEN)