Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
class SubscribeEvent(SubscriptionEvent):
async def process(self):
await super().process()
await self.subscription.subscribe(self)
return True
class MessageEvent(SubscriptionEvent):
async def process(self):
await super().process()
await self.subscription.message(self)
return True
class UnsubscribeEvent(SubscriptionEvent):
async def process(self):
await super().process()
await self.subscription.unsubscribe(self)
return True
'subscription': self.subscription_name,
} if self.subscription_name else {})
async def process(self):
self.subscription.validate()
return True
class SubscribeEvent(SubscriptionEvent):
async def process(self):
await super().process()
await self.subscription.subscribe(self)
return True
class MessageEvent(SubscriptionEvent):
async def process(self):
await super().process()
await self.subscription.message(self)
return True
class UnsubscribeEvent(SubscriptionEvent):
async def process(self):
await super().process()
await self.subscription.unsubscribe(self)
return True
async def send_error(self, error, data=None):
await super().send_error(error, data=data, extra_data={
'subscription': self.subscription_name,
} if self.subscription_name else {})
async def send_ok(self, data=None):
await super().send_ok(data=data, extra_data={
'subscription': self.subscription_name,
} if self.subscription_name else {})
async def process(self):
self.subscription.validate()
return True
class SubscribeEvent(SubscriptionEvent):
async def process(self):
await super().process()
await self.subscription.subscribe(self)
return True
class MessageEvent(SubscriptionEvent):
async def process(self):
await super().process()
await self.subscription.message(self)
return True
class UnsubscribeEvent(SubscriptionEvent):
async def process(self):
await super().process()