Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
await session.on_client_event(None)
assert client.log.pop() == {
'status': 'error',
'error': c.ERR_INVALID_EVENT,
}
await session.on_client_event('hello')
assert client.log.pop() == {
'status': 'error',
'error': c.ERR_INVALID_EVENT,
}
await session.on_client_event({})
assert client.log.pop() == {
'status': 'error',
'error': c.ERR_INVALID_EVENT,
}
await session.on_client_event({'event': None})
assert client.log.pop() == {
'status': 'error',
'error': c.ERR_INVALID_EVENT,
}
await session.on_client_event({'event': ''})
assert client.log.pop() == {
'status': 'error',
'event': '',
'error': c.ERR_EVENT_NOT_FOUND,
}
await session.on_client_event({'event': 'hello'})
async def full_process(self):
msg = {
'status': 'error',
'error': c.ERR_INVALID_EVENT,
}
await self.session.send(msg)
return False
async def consumer_handler(self):
try:
ping_handler = asyncio.ensure_future(self.ping_handler())
try:
while True:
event = await self.websocket.recv()
try:
data = json.loads(event)
except json.decoder.JSONDecodeError:
self.session.log.warn('received invalid json')
await self.send({
"status": "error",
"error": c.ERR_INVALID_EVENT,
})
else:
await self.session.on_client_event(data)
except websockets.ConnectionClosed:
await self.session.on_close()
ping_handler.cancel()
except Exception:
self.session.log.exception('unhandled error in consumer handler')