Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def go():
subscriber = yield from aiozmq.rpc.serve_pubsub(
Handler(), subscribe='topic', bind='tcp://*:*')
subscriber_addr = next(iter(subscriber.transport.bindings()))
publisher = yield from aiozmq.rpc.connect_pubsub(
connect=subscriber_addr)
yield from publisher.publish('topic').remote_func(1, 2)
subscriber.close()
publisher.close()
def setup_pubsub(self):
redis = yield from aioredis.create_redis(
(settings.REDIS_HOST, settings.REDIS_PORT)
)
if self.role == 'stores':
bind_addr = settings.SUBSCRIBER_ENDPOINTS[self.role]
else:
bind_addr = 'tcp://{host}:*'.format(host=settings.INTERNAL_HOST)
self.subscriber = yield from aiozmq.rpc.serve_pubsub(
self.handler, subscribe='',
bind=bind_addr,
log_exceptions=True)
subscriber_addr = list(self.subscriber.transport.bindings())[0]
self.publisher = yield from aiozmq.rpc.connect_pubsub()
if self.role == 'storeclient':
self.publisher.transport.connect(
settings.SUBSCRIBER_ENDPOINTS['stores'])
_key = 'SUBSCRIBER_REGISTERED_{}'.format(subscriber_addr)
ret = 0
yield from redis.set(_key, ret)
while ret != b'1':
yield from self.publisher.publish(
'register_sub'
).register_sub(
subscriber_addr, _key