Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def main(argv):
if len(sys.argv) > 1:
cmd = sys.argv[1]
else:
cmd = 'bash'
term_manager = SingleTermManager(shell_command=[cmd])
handlers = [
(r"/websocket", TermSocket,
{'term_manager': term_manager}),
(r"/", TerminalPageHandler),
(r'/(.*)', tornado.web.StaticFileHandler,
{'path': '.'}),
(r'/*/(.*)', tornado.web.StaticFileHandler,
{'path': '.'}),
]
nb_command = [sys.executable, '-m', 'notebook', '--no-browser',
'--NotebookApp.allow_origin="*"']
nb_server = subprocess.Popen(nb_command, stderr=subprocess.STDOUT,
stdout=subprocess.PIPE)
# wait for notebook server to start up
while 1:
line = nb_server.stdout.readline().decode('utf-8').strip()
def add_terminal(self, route, command, workdir=None, env=None):
assert route.startswith('/')
if self.app is None:
raise Exception("we don't support init_app yet")
term_manager = SingleTermManager(shell_command=command)
wrapped_app = tornado.wsgi.WSGIContainer(self.app)
handlers = [
("{}/websocket".format(self.url_prefix),
TermSocket, {'term_manager': term_manager}),
(route,
IndexHandler, {'url_prefix': self.url_prefix}),
("{}/(.*)".format(self.url_prefix),
tornado.web.StaticFileHandler, {'path': '.'}),
("/(.*)",
tornado.web.FallbackHandler, {'fallback': wrapped_app}),
]
self.tornado_app = tornado.web.Application(handlers)
from tornado import web
import terminado
from notebook._tz import utcnow
from ..base.handlers import IPythonHandler
from ..base.zmqhandlers import WebSocketMixin
class TerminalHandler(IPythonHandler):
"""Render the terminal interface."""
@web.authenticated
def get(self, term_name):
self.write(self.render_template('terminal.html',
ws_path="terminals/websocket/%s" % term_name))
class TermSocket(WebSocketMixin, IPythonHandler, terminado.TermSocket):
def origin_check(self):
"""Terminado adds redundant origin_check
Tornado already calls check_origin, so don't do anything here.
"""
return True
def get(self, *args, **kwargs):
if not self.get_current_user():
raise web.HTTPError(403)
return super(TermSocket, self).get(*args, **kwargs)
def on_message(self, message):
super(TermSocket, self).on_message(message)
self.application.settings['terminal_last_activity'] = utcnow()
def __init__(self, *args, **kwargs):
app_static_handlers = []
for spec in get_installed_app_static_specs():
s_url = r"/static/{}/(.*)".format(spec[0])
s_dir = os.path.join(spec[1],'static')
app_static_handlers.append(
(s_url, tornado.web.StaticFileHandler, {'path': s_dir})
)
term_manager = SingleTermManager(shell_command=['bash'])
self.term_manager = term_manager
term_url = [(r"/terminal/a/term", TermSocket,
{'term_manager': term_manager})]
handlers = [
(r"/static/core/(.*)", tornado.web.StaticFileHandler, {'path': STATIC_DIR}),
] + app_static_handlers + URL_SCHEMA + term_url
settings = dict(
project_dir=PROJECT_DIR,
static_dir=STATIC_DIR,
login_url=global_settings.LOGIN_URL,
cookie_secret = global_settings.COOKIE_SECRET,
debug = global_settings.DEBUG,
xsrf_cookies=True,
ui_methods=ui_methods,
)
import sandstone.lib.decorators
from sandstone.lib.handlers.base import BaseHandler
from terminado import TermSocket
class AuthTermSocket(TermSocket,BaseHandler):
@sandstone.lib.decorators.authenticated
def get(self, *args, **kwargs):
return super(AuthTermSocket, self).get(*args, **kwargs)