Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
})
assert client.log.pop() == {
'event': 'subscribe',
'subscription': 'simple.topic',
'status': 'ok',
}
await session.on_client_event({
'event': 'subscribe',
'subscription': 'simple.topic',
})
assert client.log.pop() == {
'event': 'subscribe',
'subscription': 'simple.topic',
'status': 'error',
'error': c.ERR_ALREADY_SUBSCRIBED,
}
await session.on_client_event({
'event': 'message',
'subscription': 'simple.invalid',
'data': {'foo': 'bar'},
})
assert client.log.pop() == {
'status': 'error',
'subscription': 'simple.invalid',
'event': 'message',
'error': c.ERR_SUBSCRIPTION_NOT_FOUND,
}
await session.on_client_event({
'event': 'unsubscribe',
async def subscribe(self, event):
"""
Subscribes to the subscription.
"""
require_authentication = self.service_config.get(
'require_authentication', True)
if require_authentication and not self.session.auth_info:
raise EventError(c.ERR_AUTH_REQUIRED)
if self.name in self.session.subscriptions:
raise EventError(c.ERR_ALREADY_SUBSCRIBED)
await self.authorize_subscription()
await self.shark.service_receiver.add_provisional_subscription(
self.session, self.name)
result = await self.before_subscribe()
self.session.subscriptions[self.name] = self
if self.should_deliver_message(result):
await event.send_ok(result.get('data'))
await self.shark.service_receiver.confirm_subscription(
self.session, self.name)