Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_runsubproc_simple(xonsh_builtins, xonsh_execer):
new_cwd = os.path.dirname(__file__)
xonsh_builtins.__xonsh_env__['PATH'] = os.path.join(new_cwd, 'bin') + \
os.pathsep + os.path.dirname(sys.executable)
xonsh_builtins.__xonsh_env__['XONSH_ENCODING'] = 'utf8'
xonsh_builtins.__xonsh_env__['XONSH_ENCODING_ERRORS'] = 'surrogateescape'
xonsh_builtins.__xonsh_env__['XONSH_PROC_FREQUENCY'] = 1e-4
if ON_WINDOWS:
pathext = xonsh_builtins.__xonsh_env__['PATHEXT']
xonsh_builtins.__xonsh_env__['PATHEXT'] = ';'.join(pathext)
pwd = 'PWD.BAT'
else:
pwd = 'pwd'
out = run_subproc([[pwd]], captured='stdout')
assert out.rstrip() == new_cwd
shell_kwargs["shell_type"] = "none"
elif not sys.stdin.isatty() and not args.force_interactive:
args.mode = XonshMode.script_from_stdin
shell_kwargs["shell_type"] = "none"
else:
args.mode = XonshMode.interactive
shell_kwargs["completer"] = True
shell_kwargs["login"] = True
env = start_services(shell_kwargs, args)
env["XONSH_LOGIN"] = shell_kwargs["login"]
if args.defines is not None:
env.update([x.split("=", 1) for x in args.defines])
env["XONSH_INTERACTIVE"] = args.force_interactive or (
args.mode == XonshMode.interactive
)
if ON_WINDOWS:
setup_win_unicode_console(env.get("WIN_UNICODE_CONSOLE", True))
return args
def parse_funcs(s, shell, sourcer=None, extra_args=()):
"""Parses the funcs portion of a string into a dict of callable foreign
function wrappers.
"""
m = FUNCS_RE.search(s)
if m is None:
return {}
g1 = m.group(1)
if ON_WINDOWS:
g1 = g1.replace(os.sep, os.altsep)
try:
namefiles = json.loads(g1.strip())
except json.decoder.JSONDecodeError as exc:
msg = (
"{0!r}\n\ncould not parse {1} functions:\n"
" s = {2!r}\n"
" g1 = {3!r}\n\n"
"Note: you may be seeing this error if you use zsh with "
"prezto. Prezto overwrites GNU coreutils functions (like echo) "
"with its own zsh functions. Please try disabling prezto."
)
warnings.warn(msg.format(exc, shell, s, g1), RuntimeWarning)
return {}
sourcer = DEFAULT_SOURCERS.get(shell, "source") if sourcer is None else sourcer
funcs = {}
"""Sets terminal title."""
env = builtins.__xonsh__.env # pylint: disable=no-member
term = env.get("TERM", None)
# Shells running in emacs sets TERM to "dumb" or "eterm-color".
# Do not set title for these to avoid garbled prompt.
if (term is None and not ON_WINDOWS) or term in [
"dumb",
"eterm-color",
"linux",
]:
return
t = env.get("TITLE")
if t is None:
return
t = self.prompt_formatter(t)
if ON_WINDOWS and "ANSICON" not in env:
kernel32.SetConsoleTitleW(t)
else:
with open(1, "wb", closefd=False) as f:
# prevent xonsh from answering interactive questions
# on the next command by writing the title
f.write("\x1b]0;{0}\x07".format(t).encode())
f.flush()
def executables_in(path):
"""Returns a generator of files in path that the user could execute. """
if ON_WINDOWS:
func = _executables_in_windows
else:
func = _executables_in_posix
try:
yield from func(path)
except PermissionError:
return
def tty():
if ON_WINDOWS:
return
else:
return importlib.import_module("tty")
def detect_xpip_alias():
"""
Determines the correct invocation to get xonsh's pip
"""
if not getattr(sys, "executable", None):
return lambda args, stdin=None: (
"",
"Sorry, unable to run pip on your system (missing sys.executable)",
1,
)
basecmd = [sys.executable, "-m", "pip"]
try:
if ON_WINDOWS:
# XXX: Does windows have an installation mode that requires UAC?
return basecmd
elif not os.access(os.path.dirname(sys.executable), os.W_OK):
return ["sudo"] + basecmd
else:
return basecmd
except Exception:
# Something freaky happened, return something that'll probably work
return basecmd
def default_xonshrc(env):
"""Creates a new instance of the default xonshrc tuple."""
xcdrc = os.path.join(xonsh_config_dir(env), "rc.xsh")
if ON_WINDOWS:
dxrc = (
os.path.join(os_environ["ALLUSERSPROFILE"], "xonsh", "xonshrc"),
xcdrc,
os.path.expanduser("~/.xonshrc"),
)
else:
dxrc = ("/etc/xonshrc", xcdrc, os.path.expanduser("~/.xonshrc"))
# Check if old config file exists and issue warning
old_config_filename = xonshconfig(env)
if os.path.isfile(old_config_filename):
print(
"WARNING! old style configuration ("
+ old_config_filename
+ ") is no longer supported. "
+ "Please migrate to xonshrc."
)
("Python", "{}.{}.{}".format(*PYTHON_VERSION_INFO)),
("PLY", ply.__version__),
("have readline", is_readline_available()),
("prompt toolkit", ptk_version() or None),
("shell type", env.get("SHELL_TYPE")),
("pygments", pygments_version()),
("on posix", bool(ON_POSIX)),
("on linux", bool(ON_LINUX)),
]
)
if ON_LINUX:
data.append(("distro", linux_distro()))
data.extend(
[
("on darwin", ON_DARWIN),
("on windows", ON_WINDOWS),
("on cygwin", ON_CYGWIN),
("on msys2", ON_MSYS),
("is superuser", is_superuser()),
("default encoding", DEFAULT_ENCODING),
("xonsh encoding", env.get("XONSH_ENCODING")),
("encoding errors", env.get("XONSH_ENCODING_ERRORS")),
]
)
formatter = _xonfig_format_json if ns.json else _xonfig_format_human
s = formatter(data)
return s
r, w = pty.openpty() if use_tty else os.pipe()
_safe_pipe_properties(w, use_tty=use_tty)
last.stdout = safe_open(w, "w")
_safe_pipe_properties(r, use_tty=use_tty)
last.captured_stdout = safe_open(r, "r")
# set standard error
if last.stderr is not None:
pass
elif captured == "object":
r, w = os.pipe()
last.stderr = safe_open(w, "w")
last.captured_stderr = safe_open(r, "r")
elif builtins.__xonsh__.stderr_uncaptured is not None:
last.stderr = builtins.__xonsh__.stderr_uncaptured
last.captured_stderr = last.stderr
elif ON_WINDOWS and not callable_alias:
last.universal_newlines = True
last.stderr = None # must truly stream on windows
else:
r, w = pty.openpty() if use_tty else os.pipe()
_safe_pipe_properties(w, use_tty=use_tty)
last.stderr = safe_open(w, "w")
_safe_pipe_properties(r, use_tty=use_tty)
last.captured_stderr = safe_open(r, "r")
# redirect stdout to stderr, if we should
if isinstance(last.stdout, int) and last.stdout == 2:
# need to use private interface to avoid duplication.
last._stdout = last.stderr
# redirect stderr to stdout, if we should
if callable_alias and last.stderr == subprocess.STDOUT:
last._stderr = last.stdout
last.captured_stderr = last.captured_stdout