Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
fut = met(*args, _async = True)
fut.add_callback(self.do_reply(idm))
except (AttributeError, RuntimeError) as exc:
self.reply(idm, 'error', str(exc))
def on_close(self):
self.qim.close()
self.qim = None
print("[%d] Disconnected" % (self.sid))
if __name__ == "__main__":
if len(sys.argv) != 2:
print("Usage: %s SD_URL" % sys.argv[0])
sys.exit(1)
QI_APP = qi.Application()
URL = sys.argv[1]
ROUTER = tornadio2.router.TornadioRouter(QiMessagingHandler)
SOCK_APP = tornado.web.Application(
ROUTER.urls,
socket_io_port = 8002
)
HTTP_APP = tornado.web.Application(
[(r'/(socket.io.min.js)', tornado.web.StaticFileHandler, {'path': "./"}),
(r'/(qimessaging.js)', tornado.web.StaticFileHandler, {'path': "./"}),
(r'/(jquery.min.js)', tornado.web.StaticFileHandler, {'path': "./"})]
)
if debug_robot:
sys.argv.extend(["--qi-url", debug_robot])
qi_url = debug_robot
else:
raise RuntimeError("No robot, not running.")
qiapp = None
sys.argv[0] = str(sys.argv[0])
# In versions bellow 2.3, look for --qi-url in the arguemnts and call accordingly the Application
if qi_url and hasattr(qi, "__version__") and LooseVersion(qi.__version__) < LooseVersion("2.3"):
qiapp = qi.Application(url="tcp://"+qi_url+":9559")
# In versions greater than 2.3 the ip can simply be passed through argv[0]
else:
# In some environments sys.argv[0] has unicode, which qi rejects
qiapp = qi.Application()
qiapp.start()
return qiapp
qi_url = args.qi_url
elif not is_on_robot():
print "no --qi-url parameter given; interactively getting debug robot."
debug_robot = get_debug_robot()
if debug_robot:
sys.argv.extend(["--qi-url", debug_robot])
qi_url = debug_robot
else:
raise RuntimeError("No robot, not running.")
qiapp = None
sys.argv[0] = str(sys.argv[0])
# In versions bellow 2.3, look for --qi-url in the arguemnts and call accordingly the Application
if qi_url and hasattr(qi, "__version__") and LooseVersion(qi.__version__) < LooseVersion("2.3"):
qiapp = qi.Application(url="tcp://"+qi_url+":9559")
# In versions greater than 2.3 the ip can simply be passed through argv[0]
else:
# In some environments sys.argv[0] has unicode, which qi rejects
qiapp = qi.Application()
qiapp.start()
return qiapp
def create_session():
application = qi.Application([NaoqiApp.__name__, "--qi-url={}".format(config.NAOQI_URL)])
try: application.start()
except RuntimeError as e:
raise RuntimeError("Couldn't connect to robot, check if robot IP is set correctly in pepper/config.py"
"\n\tOriginal Error: {}".format(e))
return application.session
def create_session(url):
"""
Create Qi Session with Pepper/Nao Robot
Parameters
----------
url: str
Returns
-------
session: qi.Session
"""
application = qi.Application([NAOqiBackend.__name__, "--qi-url={}".format(url)])
try: application.start()
except RuntimeError as e:
raise RuntimeError("Couldn't connect to robot @ {}\n\tOriginal Error: {}".format(url, e))
return application.session
return
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--ip", type=str, default="10.9.45.11",
help="Robot IP address. On robot or Local Naoqi: use \
'127.0.0.1'.")
parser.add_argument("--port", type=int, default=9559,
help="Naoqi port number")
args = parser.parse_args()
# Initialize qi framework.
connection_url = "tcp://" + args.ip + ":" + str(args.port)
app = qi.Application(["RemoteVoice",
"--qi-url=" + connection_url])
remote_voice = RemoteVoice(app)
remote_voice.run()
def main():
app = qi.Application(url='tcp://169.254.254.250:9559')
app.start()
session = app.session
myAnimation = MyAnimation()
session.registerService("MyAnimation", myAnimation)
app.run()