Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def go():
subscriber = yield from aiozmq.rpc.serve_pubsub(
Handler(), subscribe='topic', bind='tcp://*:*')
subscriber_addr = next(iter(subscriber.transport.bindings()))
publisher = yield from aiozmq.rpc.connect_pubsub(
connect=subscriber_addr)
yield from publisher.publish('topic').remote_func(1, 2)
subscriber.close()
publisher.close()
async def forward(self):
""" Forward log events to the aggregator, using method names as the topics. """
if not self.ready:
self.loop, self.buffer = asyncio.get_running_loop(), asyncio.Queue()
publisher = await aiozmq.rpc.connect_pubsub(connect=self.frontend)
try:
while True:
method, event = await self.buffer.get()
self.buffer.task_done()
await publisher.publish(method).log(event)
finally:
publisher.close()
def setup_pubsub(self):
redis = yield from aioredis.create_redis(
(settings.REDIS_HOST, settings.REDIS_PORT)
)
if self.role == 'stores':
bind_addr = settings.SUBSCRIBER_ENDPOINTS[self.role]
else:
bind_addr = 'tcp://{host}:*'.format(host=settings.INTERNAL_HOST)
self.subscriber = yield from aiozmq.rpc.serve_pubsub(
self.handler, subscribe='',
bind=bind_addr,
log_exceptions=True)
subscriber_addr = list(self.subscriber.transport.bindings())[0]
self.publisher = yield from aiozmq.rpc.connect_pubsub()
if self.role == 'storeclient':
self.publisher.transport.connect(
settings.SUBSCRIBER_ENDPOINTS['stores'])
_key = 'SUBSCRIBER_REGISTERED_{}'.format(subscriber_addr)
ret = 0
yield from redis.set(_key, ret)
while ret != b'1':
yield from self.publisher.publish(
'register_sub'
).register_sub(
subscriber_addr, _key
)
ret = yield from redis.get(_key)
yield from asyncio.sleep(0.01)
self.lock.release()
redis.close()