Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def current_ctx(app, path):
builder = ScopeBuilder(path)
token = current._init_(FakeRequestContext, app, builder.get_data()[0])
yield current
current._close_(token)
def current_ctx(path, app=None):
builder = ScopeBuilder(path)
token = current._init_(FakeRequestContext, app, builder.get_data()[0])
yield current
current._close_(token)
async def handle_request(self, scope, receive, send):
ctx_token = current._init_(WSContext, self.app, scope, receive, send)
try:
await self.pre_handler(scope, receive, send)
except asyncio.CancelledError:
if not scope.get('emt._flow_cancel', False):
self.app.log.exception('Application exception:')
except Exception:
self.app.log.exception('Application exception:')
# http = HTTP(
# 500, await self.error_handler(), headers=response.headers)
current._close_(ctx_token)
async def handle_request(self, scope, receive, send):
ctx_token = current._init_(RequestContext, self.app, scope)
try:
http = await self.pre_handler(scope, receive, send)
except asyncio.CancelledError:
if not scope.get('emt._flow_cancel', False):
self.app.log.exception('Application exception:')
http = HTTP(
500, await self.error_handler(), headers=response.headers)
else:
current._close_(ctx_token)
return
except Exception:
self.app.log.exception('Application exception:')
http = HTTP(
500, await self.error_handler(), headers=response.headers)
current._close_(ctx_token)
# TODO: timeout from app config/response
await asyncio.wait_for(http.send(scope, send), None)