Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_home(self):
mypath = local.env.home / 'some_simple_home_rc.ini'
assert not mypath.exists()
try:
with Config('~/some_simple_home_rc.ini') as conf:
conf['a'] = 'b'
assert mypath.exists()
mypath.unlink()
with Config(mypath) as conf:
conf['a'] = 'b'
assert mypath.exists()
mypath.unlink()
finally:
mypath.unlink()
def test_parallel(self):
m = Cluster(local, local)
import time
t = time.time()
ret = m["sleep"]("2")
assert len(ret) == 2
assert 2 <= time.time() - t < 4
def test_sshpass(self):
with local.as_root():
local["useradd"]("-m", "-b", "/tmp", "testuser")
try:
with local.as_root():
try:
(local["passwd"] << "123456")("--stdin", "testuser")
except ProcessExecutionError:
# some versions of passwd don't support --stdin, nothing to do in this case
logging.warn("passwd failed")
return
with SshMachine("localhost", user = "testuser", password = "123456") as rem:
assert rem["pwd"]().strip() == "/tmp/testuser"
finally:
with local.as_root():
local["userdel"]("-r", "testuser")
def make_nginx(links):
nginx_fn = getLinkFilename(links["nginx"])
nginx_dir = getLinkDir(links["nginx"])
local.cwd.chdir(nginx_dir)
conf_cmd = local["./configure"]["--with-pcre=" + os.path.join(this_dir, getLinkDir(links["pcre"])),
"--with-zlib=" + os.path.join(this_dir, getLinkDir(links["zlib"])),
"--with-http_stub_status_module",
"--with-openssl=" + os.path.join(this_dir, getLinkDir(links["openssl"])),
"--add-module=../nginx_upstream_check_module-master",
"--with-http_ssl_module",
"--with-file-aio",
"--with-http_addition_module",
"--with-http_auth_request_module",
"--with-http_dav_module",
"--with-http_degradation_module",
"--with-http_flv_module",
"--with-http_gunzip_module",
"--with-http_gzip_static_module",
"--with-http_random_index_module",
"--with-http_realip_module",
def __init__(self, remote_machine, server_class="rpyc.utils.server.ThreadedServer",
extra_setup="", python_executable=None):
self.proc = None
self.tun = None
self.remote_machine = remote_machine
self._tmpdir_ctx = None
rpyc_root = local.path(rpyc.__file__).up()
self._tmpdir_ctx = remote_machine.tempdir()
tmp = self._tmpdir_ctx.__enter__()
copy(rpyc_root, tmp / "rpyc")
script = (tmp / "deployed-rpyc.py")
modname, clsname = server_class.rsplit(".", 1)
script.write(SERVER_SCRIPT.replace("$MODULE$", modname).replace(
"$SERVER$", clsname).replace("$EXTRA_SETUP$", extra_setup))
if isinstance(python_executable, BoundCommand):
cmd = python_executable
elif python_executable:
cmd = remote_machine[python_executable]
else:
major = sys.version_info[0]
minor = sys.version_info[1]
cmd = None
"""
Miscellaneous utility functions to deal with the git repository
"""
import pathlib
from plumbum import local
GIT = local['git']
_GIT_INDEX = (pathlib.Path(__file__).parent.parent.parent.parent/'.git'/'index').read_bytes()
def in_git(p):
"""
Returns true if the given path is under version control.
"""
return str(p).encode('ascii', 'ignore') in _GIT_INDEX
def head_commit_sha():
"""
Returns the commit id of HEAD
"""
return str(GIT("show", "--format=%h", "-q", "HEAD")).strip()
def do_signal(raised_exc):
global LAST_ERROR
if LAST_ERROR is not raised_exc:
_logger.debug("MainThread took the exception - we're done here")
if use_concurrent_loop:
raiser.stop()
return
_logger.info("Raising %s in main thread", type(LAST_ERROR))
local.cmd.kill("-%d" % REGISTERED_SIGNAL, pid)
if __name__ == "__main__":
from plumbum import local
from plumbum.cmd import ls, date, sleep
c = ls & date & sleep[1]
print(c())
c = ls & date & sleep[1] & sleep["-z"]
try:
c()
except ProcessExecutionError as ex:
print(ex)
else:
assert False
clst = Cluster(local, local, local)
print(clst["ls"]())
# This works fine
print(local.session().run("echo $$"))
# this does not
ret, stdout, stderr = clst.session().run("echo $$")
print(ret)
ret = [int(pid) for pid in stdout]
assert(len(set(ret))==3)
def get_test_runner():
test_runners = ['tox', 'nosetests', 'py.test']
test_runner = None
for runner in test_runners:
try:
test_runner = local[runner]
except CommandNotFound:
continue
return test_runner
def is_running(process):
ps = local['ps']
result = ps['asw']()
return re.search(process, result)