Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@Handler.on_event('websocket.receive')
async def event_receive(self, scope, receive, send, event):
await scope['emt.wsqueue'].put(event.get('bytes') or event['text'])
return _event_looper
@Handler.on_event('http.disconnect')
async def event_disconnect(self, scope, receive, send, event):
return
@Handler.on_event('websocket.disconnect')
async def event_disconnect(self, scope, receive, send, event):
return
@Handler.on_event('lifespan.shutdown')
async def event_shutdown(self, scope, receive, send, event):
await send({'type': 'lifespan.shutdown.complete'})
@Handler.on_event('lifespan.startup')
async def event_startup(self, scope, receive, send, event):
await send({'type': 'lifespan.startup.complete'})
return _event_looper
@Handler.on_event('http.request')
async def event_request(self, scope, receive, send, event):
scope['emt.input'].append(event['body'])
if not event.get('more_body', False):
scope['emt.input'].set_complete()
return _event_looper