Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_shark_init(self):
shark = SocketShark(TEST_CONFIG)
loop = asyncio.get_event_loop()
loop.run_until_complete(shark.prepare())
loop.run_until_complete(shark.shutdown())
async def test_rate_limit(self):
"""
Make sure SocketShark retries 429 responses appropriately.
"""
http_retry_config = TEST_CONFIG.copy()
http_retry_config['HTTP']['tries'] = 2
http_retry_config['HTTP']['wait'] = 1
shark = SocketShark(http_retry_config)
await shark.prepare()
client = MockClient(shark)
session = client.session
subscription = 'simple_before_subscribe.topic'
conf = TEST_CONFIG['SERVICES']['simple_before_subscribe']
with aioresponses() as mock:
mock.post(conf['before_subscribe'], status=429, headers={
'X-Rate-Limit-Reset': '0.2',
})
mock.post(conf['before_subscribe'], payload={
'status': 'ok',
'data': {},
})
# Respond to ping late
msg = await ws.receive()
assert msg.type == aiohttp.WSMsgType.PING
await asyncio.sleep(0.1)
ws.pong(msg.data)
# Ensure we get disconnected
msg = await ws.receive()
assert msg.type == aiohttp.WSMsgType.CLOSE
await ws.close()
await aiosession.close()
asyncio.ensure_future(shark.shutdown())
shark = SocketShark(TEST_CONFIG)
asyncio.ensure_future(task())
shark.start()
assert msg.type == aiohttp.WSMsgType.TEXT
data = json.loads(msg.data)
assert data == {
'event': 'message',
'subscription': subscription,
'data': {'baz': 'new'},
}
redis.close()
await ws2.close()
await aiosession.close()
asyncio.ensure_future(shark.shutdown())
shark = SocketShark(TEST_CONFIG)
asyncio.ensure_future(task())
shark.start()
async def test_subscription_needs_auth(self):
"""
For safety reasons, subscriptions require authentication by default.
"""
shark = SocketShark(TEST_CONFIG)
client = MockClient(shark)
session = client.session
await session.on_client_event({
'event': 'subscribe',
'subscription': 'empty.topic',
})
assert client.log.pop() == {
'event': 'subscribe',
'subscription': 'empty.topic',
'status': 'error',
'error': c.ERR_AUTH_REQUIRED,
}
assert not client.log
'subscription': 'ws_test.hello',
'status': 'ok',
}
mock.start()
mock.post(conf['on_unsubscribe'], payload={})
asyncio.ensure_future(shark.shutdown())
msg = await ws.receive()
assert msg.type == aiohttp.WSMsgType.CLOSE
await ws.close()
await aiosession.close()
shark = SocketShark(TEST_CONFIG)
asyncio.ensure_future(task())
shark.start()
mock.stop()
requests = mock.requests[('POST', conf['on_unsubscribe'])]
assert len(requests) == 1
assert requests[0].kwargs['json'] == {
'subscription': 'ws_test.hello'}
async def test_throttle(self):
shark = SocketShark(TEST_CONFIG)
await shark.prepare()
client = MockClient(shark)
session = client.session
subscription = 'simple.topic'
await session.on_client_event({
'event': 'subscribe',
'subscription': subscription,
})
assert client.log.pop() == {
'event': 'subscribe',
'subscription': subscription,
'status': 'ok',
}
def test_websocket(self):
shark = SocketShark(TEST_CONFIG)
done = False
async def task():
nonlocal done
# Wait until backend is ready.
await asyncio.sleep(0.1)
aiosession = aiohttp.ClientSession()
mock = aioresponses()
conf = TEST_CONFIG['SERVICES']['ws_test']
async with aiosession.ws_connect(self.ws_url) as ws:
await ws.send_str(json.dumps({
'event': 'subscribe',
'event': 'subscribe',
'subscription': 'simple.hello',
'status': 'ok',
}
shark.redis.close()
msg = await ws1.receive()
assert msg.type == aiohttp.WSMsgType.CLOSE
msg = await ws2.receive()
assert msg.type == aiohttp.WSMsgType.CLOSE
await aiosession.close()
shark = SocketShark(TEST_CONFIG)
asyncio.ensure_future(task())
shark.start()
await aiosession.close()
# Wait until backend learns about the disconnected WebSocket.
await asyncio.sleep(0.1)
mock.stop()
requests = mock.requests[('POST', conf['on_unsubscribe'])]
assert len(requests) == 1
assert requests[0].kwargs['json'] == {
'subscription': 'ws_test.hello'}
await shark.shutdown()
done = True
shark = SocketShark(TEST_CONFIG)
asyncio.ensure_future(task())
shark.start()
assert done