Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _send_event_wait(node, result_event, bc, sub):
if not result_event:
return (True, [])
# Wait for reply event.
tries = 0
while True:
msgs = sub.get(1, 1)
for msg in msgs:
(topic, event) = msg
ev = broker.zeek.Event(event)
args = ev.args()
logging.debug("broker: %s(%s) from node %s", result_event,
", ".join(args), node.name)
return (True, args)
tries += 1
if tries > config.Config.commtimeout:
logging.debug("broker: timeout during receive from node %s", node.name)
return (False, "time-out")
host = node.addr
endpoint = broker.Endpoint()
subscriber = endpoint.make_subscriber(topic)
status_subscriber = endpoint.make_status_subscriber(True)
endpoint.peer(host, node.getPort(), 1)
tries = 0
while True:
msgs = status_subscriber.get(1, 1)
for msg in msgs:
if isinstance(msg, broker.Status):
if msg.code() == broker.SC.PeerAdded:
ev = broker.zeek.Event(event, *args)
endpoint.publish(topic + "/" + repr(msg.context()), ev)
logging.debug("broker: %s(%s) to node %s", event,
", ".join(args), node.name)
return (True, endpoint, subscriber)
tries += 1
if tries > config.Config.commtimeout:
return (False, "time-out", None)