Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_subscription_authorizer(self):
shark = SocketShark(TEST_CONFIG)
await shark.prepare()
client = MockClient(shark)
session = client.session
await session.on_client_event({
'event': 'subscribe',
'subscription': 'authorizer.topic',
})
assert client.log.pop() == {
'event': 'subscribe',
'subscription': 'authorizer.topic',
'status': 'error',
'error': c.ERR_AUTH_REQUIRED,
}
await self._auth_session(session)
with aioresponses() as mock:
# Authorizer is unavailable.
await session.on_client_event({
'event': 'subscribe',
'subscription': 'authorizer.topic',
})
assert client.log.pop() == {
'event': 'subscribe',
'subscription': 'authorizer.topic',
'status': 'error',
'error': c.ERR_SERVICE_UNAVAILABLE,
}
async def subscribe(self, event):
"""
Subscribes to the subscription.
"""
require_authentication = self.service_config.get(
'require_authentication', True)
if require_authentication and not self.session.auth_info:
raise EventError(c.ERR_AUTH_REQUIRED)
if self.name in self.session.subscriptions:
raise EventError(c.ERR_ALREADY_SUBSCRIBED)
await self.authorize_subscription()
await self.shark.service_receiver.add_provisional_subscription(
self.session, self.name)
result = await self.before_subscribe()
self.session.subscriptions[self.name] = self
if self.should_deliver_message(result):
await event.send_ok(result.get('data'))