Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from xpra.scripts.server import run_server
return run_server(error_cb, options, mode, script_file, args)
elif mode in ("attach", "detach", "screenshot", "version", "info", "control", "_monitor"):
return run_client(error_cb, options, args, mode)
elif mode in ("stop", "exit") and (supports_server or supports_shadow):
nox()
return run_stopexit(mode, error_cb, options, args)
elif mode == "list" and (supports_server or supports_shadow):
return run_list(error_cb, options, args)
elif mode in ("_proxy", "_proxy_start", "_shadow_start") and (supports_server or supports_shadow):
nox()
return run_proxy(error_cb, options, script_file, args, mode, defaults)
elif mode == "initenv":
from xpra.scripts.server import xpra_runner_shell_script, write_runner_shell_script
script = xpra_runner_shell_script(script_file, os.getcwd(), options.socket_dir)
dotxpra = DotXpra(options.socket_dir)
write_runner_shell_script(dotxpra, script, False)
return 0
else:
error_cb("invalid mode '%s'" % mode)
return 1
except KeyboardInterrupt, e:
sys.stderr.write("\ncaught %s, exiting\n" % repr(e))
return 128+signal.SIGINT
if child.poll() is None:
#only supported on win32 since Python 2.7
if hasattr(child, "terminate"):
child.terminate()
elif hasattr(os, "kill"):
os.kill(child.pid, signal.SIGTERM)
else:
raise Exception("cannot find function to kill subprocess")
except Exception, e:
print("error trying to stop ssh tunnel process: %s" % e)
conn = TwoFileConnection(child.stdin, child.stdout, abort_test, target=display_name, info=dtype, close_cb=stop_tunnel)
elif dtype == "unix-domain":
if not hasattr(socket, "AF_UNIX"):
return False, "unix domain sockets are not available on this operating system"
sockdir = DotXpra(display_desc["socket_dir"])
sock = socket.socket(socket.AF_UNIX)
sock.settimeout(SOCKET_TIMEOUT)
sockfile = sockdir.socket_path(display_desc["display"])
conn = _socket_connect(sock, sockfile, display_name, dtype)
elif dtype == "tcp":
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(SOCKET_TIMEOUT)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, TCP_NODELAY)
tcp_endpoint = (display_desc["host"], display_desc["port"])
conn = _socket_connect(sock, tcp_endpoint, display_name, dtype)
else:
assert False, "unsupported display type in connect: %s" % dtype
return conn
def pick_display(parser, opts, extra_args):
if len(extra_args) == 0:
# Pick a default server
sockdir = DotXpra(opts.socket_dir or get_default_socket_dir())
servers = sockdir.sockets()
live_servers = [display
for (state, display) in servers
if state is DotXpra.LIVE]
if len(live_servers) == 0:
if not LOCAL_SERVERS_SUPPORTED:
parser.error("this installation does not support local servers, you must specify a remote display")
parser.error("cannot find a live server to connect to")
elif len(live_servers) == 1:
return parse_display_name(parser.error, opts, live_servers[0])
else:
parser.error("there are multiple servers running, please specify")
elif len(extra_args) == 1:
return parse_display_name(parser.error, opts, extra_args[0])
else:
parser.error("too many arguments")
def show_final_state(display):
sockdir = DotXpra(opts.socket_dir)
for _ in range(6):
final_state = sockdir.server_state(display)
if final_state is DotXpra.LIVE:
time.sleep(0.5)
if final_state is DotXpra.DEAD:
print("xpra at %s has exited." % display)
return 0
elif final_state is DotXpra.UNKNOWN:
print("How odd... I'm not sure what's going on with xpra at %s" % display)
return 1
elif final_state is DotXpra.LIVE:
print("Failed to shutdown xpra at %s" % display)
return 1
else:
assert False, "invalid state: %s" % final_state
return 1
if child.poll() is None:
#only supported on win32 since Python 2.7
if hasattr(child, "terminate"):
child.terminate()
elif hasattr(os, "kill"):
os.kill(child.pid, signal.SIGTERM)
else:
raise Exception("cannot find function to kill subprocess")
except Exception, e:
print("error trying to stop ssh tunnel process: %s" % e)
conn = TwoFileConnection(child.stdin, child.stdout, abort_test, target=display_name, info=dtype, close_cb=stop_tunnel)
elif dtype == "unix-domain":
if not hasattr(socket, "AF_UNIX"):
return False, "unix domain sockets are not available on this operating system"
sockdir = DotXpra(display_desc["socket_dir"])
sock = socket.socket(socket.AF_UNIX)
sock.settimeout(SOCKET_TIMEOUT)
sockfile = sockdir.socket_path(display_desc["display"])
conn = _socket_connect(sock, sockfile, display_name, dtype)
elif dtype == "tcp":
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(SOCKET_TIMEOUT)
tcp_endpoint = (display_desc["host"], display_desc["port"])
conn = _socket_connect(sock, tcp_endpoint, display_name, dtype)
else:
assert False, "unsupported display type in connect: %s" % dtype
return conn
def pick_display(parser, opts, extra_args):
if len(extra_args) == 0:
# Pick a default server
sockdir = DotXpra(opts.sockdir or get_default_socket_dir())
servers = sockdir.sockets()
live_servers = [display
for (state, display) in servers
if state is DotXpra.LIVE]
if len(live_servers) == 0:
parser.error("cannot find a live server to connect to")
elif len(live_servers) == 1:
return parse_display_name(parser, opts, live_servers[0])
else:
parser.error("there are multiple servers running, please specify")
elif len(extra_args) == 1:
return parse_display_name(parser, opts, extra_args[0])
else:
parser.error("too many arguments")
def pick_display(parser, opts, extra_args):
if len(extra_args) == 0:
# Pick a default server
sockdir = DotXpra(opts.socket_dir or get_default_socket_dir())
servers = sockdir.sockets()
live_servers = [display
for (state, display) in servers
if state is DotXpra.LIVE]
if len(live_servers) == 0:
if not LOCAL_SERVERS_SUPPORTED:
parser.error("this installation does not support local servers, you must specify a remote display")
parser.error("cannot find a live server to connect to")
elif len(live_servers) == 1:
return parse_display_name(parser.error, opts, live_servers[0])
else:
parser.error("there are multiple servers running, please specify")
elif len(extra_args) == 1:
return parse_display_name(parser.error, opts, extra_args[0])
else:
parser.error("too many arguments")
def run_list(parser, opts, extra_args):
assert "gtk" not in sys.modules
if extra_args:
parser.error("too many arguments for mode")
sockdir = DotXpra(opts.sockdir)
results = sockdir.sockets()
if not results:
sys.stdout.write("No xpra sessions found\n")
return 1
sys.stdout.write("Found the following xpra sessions:\n")
for state, display in results:
sys.stdout.write("\t%s session at %s" % (state, display))
if state is DotXpra.DEAD:
try:
os.unlink(sockdir.socket_path(display))
except OSError:
pass
else:
sys.stdout.write(" (cleaned up)")
sys.stdout.write("\n")
return 0
def pick_display(parser, opts, extra_args):
if len(extra_args) == 0:
# Pick a default server
sockdir = DotXpra(opts.socket_dir or get_default_socket_dir())
servers = sockdir.sockets()
live_servers = [display
for (state, display) in servers
if state is DotXpra.LIVE]
if len(live_servers) == 0:
parser.error("cannot find a live server to connect to")
elif len(live_servers) == 1:
return parse_display_name(parser.error, opts, live_servers[0])
else:
parser.error("there are multiple servers running, please specify")
elif len(extra_args) == 1:
return parse_display_name(parser.error, opts, extra_args[0])
else:
parser.error("too many arguments")
def show_final_state(display):
sockdir = DotXpra(opts.sockdir)
for _ in range(6):
final_state = sockdir.server_state(display)
if final_state is DotXpra.LIVE:
time.sleep(0.5)
if final_state is DotXpra.DEAD:
print("xpra at %s has exited." % display)
return 0
elif final_state is DotXpra.UNKNOWN:
print("How odd... I'm not sure what's going on with xpra at %s" % display)
return 1
elif final_state is DotXpra.LIVE:
print("Failed to shutdown xpra at %s" % display)
return 1
else:
assert False, "invalid state: %s" % final_state
return 1