Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@qi.bind()
def disconnectSubscribers(self):
""" disconnect all subscribers from callbacks """
qi.info(self.serviceName, "DISCONNECTING SUBSCRIBERS")
if self.subscribeToggle:
for event in self.subscribers.keys():
future = qi.async(self.disconnectSubscriber, event, delay = 0)
future.wait(1000) # add a timeout to avoid deadlock
if not future.isFinished():
qi.error(self.serviceName, "disconnectSubscribers", "Failed disconnecting %s subscribers" % event)
self.subscribeToggle = False
@qi.bind(qi.Dynamic, qi.AnyArguments)
def special(self, *args):
print("args:", args)
@qi.bind()
def connectSubscribers(self):
""" generate & connect all subscribers to callbacks """
if not self.subscribeToggle:
for event in self.subscribers.keys():
self.subscribers[event]["subscriber"] = self.memory.subscriber(event)
self.subscribers[event]["uid"] = self.subscribers[event]["subscriber"].signal.connect(self.subscribers[event]["callback"])
self.subscribeToggle = True