Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
assert client.log.pop() == {
'event': 'subscribe',
'subscription': 'invalid',
'status': 'error',
'error': c.ERR_INVALID_SUBSCRIPTION_FORMAT,
}
await session.on_client_event({
'event': 'subscribe',
'subscription': 'invalid.topic'
})
assert client.log.pop() == {
'event': 'subscribe',
'subscription': 'invalid.topic',
'status': 'error',
'error': c.ERR_INVALID_SERVICE,
}
assert not client.log
def validate(self):
if not self.service or not self.topic:
raise EventError(c.ERR_INVALID_SUBSCRIPTION_FORMAT)
if self.service_config is None:
raise EventError(c.ERR_INVALID_SERVICE)