Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_5():
import qi
import time
print("ici")
client = qi.Client("flds")
client.connect(sys.argv[1])
server = qi.Server("blam")
server.connect(sys.argv[1])
#time.sleep(1)
#server.advertise_service("moule:::", moule)
server.advertise_service("poutre::i:is", poutre)
#time.sleep(1)
print("before client.call")
#ret = client.call("moule:::")
ret = client.call("poutre::i:is", 41, "caca")
print("called result:", ret)
time.sleep(1)
def test_4():
import qi
client = qi.Client("toto")
client.connect(sys.argv[1])
ret = client.call("master.listServices::{ss}:")
for k,v in ret.iteritems():
print(k, "=", v)
def test_5():
import qi
import time
print("ici")
client = qi.Client("flds")
client.connect(sys.argv[1])
server = qi.Server("blam")
server.connect(sys.argv[1])
#time.sleep(1)
#server.advertise_service("moule:::", moule)
server.advertise_service("poutre::i:is", poutre)
#time.sleep(1)
print("before client.call")
#ret = client.call("moule:::")
ret = client.call("poutre::i:is", 41, "caca")
print("called result:", ret)
time.sleep(1)
fut = met(*args, _async = True)
fut.add_callback(self.do_reply(idm))
except (AttributeError, RuntimeError) as exc:
self.reply(idm, 'error', str(exc))
def on_close(self):
self.qim.close()
self.qim = None
print("[%d] Disconnected" % (self.sid))
if __name__ == "__main__":
if len(sys.argv) != 2:
print("Usage: %s SD_URL" % sys.argv[0])
sys.exit(1)
QI_APP = qi.Application()
URL = sys.argv[1]
ROUTER = tornadio2.router.TornadioRouter(QiMessagingHandler)
SOCK_APP = tornado.web.Application(
ROUTER.urls,
socket_io_port = 8002
)
HTTP_APP = tornado.web.Application(
[(r'/(socket.io.min.js)', tornado.web.StaticFileHandler, {'path': "./"}),
(r'/(qimessaging.js)', tornado.web.StaticFileHandler, {'path': "./"}),
(r'/(jquery.min.js)', tornado.web.StaticFileHandler, {'path': "./"})]
)
def main(args):
if len(args) != 3:
print('Usage: %s URL DIR\n\t create AL specialized proxy for each live AL module, in DIR.' % args[0])
return
sd_url = args[1]
prefix = args[2]
abver = '.'.join(sys.version.split('.', 2)[0:2])
me = os.path.dirname(os.path.abspath(__file__))
lpath = me + '/../lib/python'+ abver + '/site-packages'
sys.path.append(lpath)
from qi import Session
session = Session()
session.connect(sd_url)
services = session.services(0)
names = map(lambda x: x[0], services)
print(names)
for s in names:
if s[0] == '_':
continue
fname = s.lower() + 'proxy'
paths = os.path.join(prefix, "alproxies", fname + '.h') + ',' + os.path.join(prefix, "src", fname + '.cpp')
cmd = ['idl.py', sd_url + '/' + s, '--output-mode=alproxy', '-o', paths]
os.system(' '.join(cmd))
def __init__( self ):
#Initialization
NaoqiNode.__init__( self, self.NODE_NAME )
from distutils.version import LooseVersion
if self.get_version() < LooseVersion('2.0.0'):
rospy.loginfo('The NAOqi version is inferior to 2.0, hence no log bridge possible')
exit(0)
rospy.init_node( self.NODE_NAME )
# the log manager is only avaiable through a session (NAOqi 2.0)
import qi
self.session = qi.Session()
self.session.connect("tcp://%s:%s" % (self.pip, self.pport))
self.logManager = self.session.service("LogManager")
self.listener = self.logManager.getListener()
self.listener.onLogMessage.connect(onMessageCallback)
rospy.loginfo('Logger initialized')
def main(session):
remote = Remote(session)
remote.run()
return
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--ip", type=str, default="10.9.45.11",
help="Robot IP address. On robot or Local Naoqi: use '127.0.0.1'.")
parser.add_argument("--port", type=int, default=9559,
help="Naoqi port number")
args = parser.parse_args()
session = qi.Session()
try:
session.connect("tcp://" + args.ip + ":" + str(args.port))
except RuntimeError:
print ("Can't connect to Naoqi at ip \"" + args.ip + "\" on port " + str(args.port) +".\n"
"Please check your script arguments. Run with -h option for help.")
sys.exit(1)
main(session)
activity.logger.error(msg)
else:
print msg
finally:
qiapp.stop()
qi.async(activity.on_start).addCallback(handle_on_start_done)
# Run the QiApplication, which runs until someone calls qiapp.stop()
qiapp.run()
finally:
# Cleanup
if hasattr(activity, "on_stop"):
# We need a qi.async call so that if the class is single threaded,
# it will wait for callbacks to be finished.
qi.async(activity.on_stop).wait()
if service_id:
qiapp.session.unregisterService(service_id)
service_id = qiapp.session.registerService(service_name, activity)
if hasattr(activity, "on_start"):
def handle_on_start_done(on_start_future):
"Custom callback, for checking errors"
if on_start_future.hasError():
try:
msg = "Error in on_start(), stopping application: %s" \
% on_start_future.error()
if hasattr(activity, "logger"):
activity.logger.error(msg)
else:
print msg
finally:
qiapp.stop()
qi.async(activity.on_start).addCallback(handle_on_start_done)
# Run the QiApplication, which runs until someone calls qiapp.stop()
qiapp.run()
finally:
# Cleanup
if hasattr(activity, "on_stop"):
# We need a qi.async call so that if the class is single threaded,
# it will wait for callbacks to be finished.
qi.async(activity.on_stop).wait()
if service_id:
qiapp.session.unregisterService(service_id)
@qi.bind()
def disconnectSubscribers(self):
""" disconnect all subscribers from callbacks """
qi.info(self.serviceName, "DISCONNECTING SUBSCRIBERS")
if self.subscribeToggle:
for event in self.subscribers.keys():
future = qi.async(self.disconnectSubscriber, event, delay = 0)
future.wait(1000) # add a timeout to avoid deadlock
if not future.isFinished():
qi.error(self.serviceName, "disconnectSubscribers", "Failed disconnecting %s subscribers" % event)
self.subscribeToggle = False