Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@skip_on_windows
def test_path(self):
assert not (local.cwd / "../non_exist1N9").exists()
assert (local.cwd / ".." / "plumbum").is_dir()
# traversal
found = False
for fn in local.cwd / ".." / "plumbum":
if fn.name == "__init__.py":
assert fn.is_file()
found = True
assert found
# glob'ing
found = False
for fn in local.cwd / ".." // "*/*.rst":
if fn.name == "index.rst":
found = True
assert found
#!/usr/bin/env python
from plumbum import local, cli, FG
from plumbum.path.utils import delete
try:
from plumbum.cmd import twine
except ImportError:
twine = None
class BuildProject(cli.Application):
'Build and optionally upload. For help, see https://packaging.python.org/en/latest/distributing/#uploading-your-project-to-pypi'
upload = cli.Flag("upload", help = "If given, the artifacts will be uploaded to PyPI")
def main(self):
delete(local.cwd // "*.egg-info", "build", "dist")
local.python("setup.py", "sdist", "bdist_wheel")
delete(local.cwd // "*.egg-info", "build")
if self.upload:
if twine is None:
print("Twine not installed, cannot securely upload. Install twine.")
else:
twine['upload', 'dist/*tar.gz', 'dist/*.whl'] & FG
else:
print("Built. To upload, run:")
print(" twine upload dist/*tar.gz dist/*.whl")
def exit(self):
try:
self.cmd().exit()
except Pyro4.errors.ConnectionClosedError:
pass
def cmd(self):
return self._cmd
class TestApplication(cli.Application):
connections = cli.SwitchAttr(['-c'], int, default = 1, help = "Number of connections per instance" )
instances = cli.SwitchAttr(['-i'], int, default = 3, help = "Number of instances to spawn" )
delay = cli.SwitchAttr(['-d'], int , default = 1, help = "Delay in seconds for probing test results")
packet_size = cli.SwitchAttr(['-s'], int, default = 10 ** 3, help = "Size of websocket frame")
ws_server = cli.SwitchAttr(['-w'], str, default = "127.0.0.1:8080", help = "Websocket server host[:port]")
iteration = cli.SwitchAttr(['--iterations'], int, default = 10, help = "Number of test results probing iterations")
remotes = cli.SwitchAttr(['--remote'], str, list = True, default = [],
help = "remote user@address of the remote machine to run client process on")
@cli.switch('--python-path', str, help = "Alternative python interpitor path.")
def python_path(self, new_path):
global python_cmd
python_cmd = new_path
@cli.switch('--script-path', str, help = "Alternative worker script path")
def script_path(self, new_path):
global client_script_path
client_script_path = new_path
def main(self):
def calcTotalMem(mems):
return "{}K".format(reduce(lambda x, y: x + int(y.replace('K','')), mems, 0))
self._host = cmd & BG
uri = self._host.proc.stdout.readline().decode('ascii').strip()
logger.debug ("Process spawned, Pyro4 uri: {}".format(uri))
self._cmd = Pyro4.Proxy(uri)
def exit(self):
try:
self.cmd().exit()
except Pyro4.errors.ConnectionClosedError:
pass
def cmd(self):
return self._cmd
class TestApplication(cli.Application):
connections = cli.SwitchAttr(['-c'], int, default = 1, help = "Number of connections per instance" )
instances = cli.SwitchAttr(['-i'], int, default = 3, help = "Number of instances to spawn" )
delay = cli.SwitchAttr(['-d'], int , default = 1, help = "Delay in seconds for probing test results")
packet_size = cli.SwitchAttr(['-s'], int, default = 10 ** 3, help = "Size of websocket frame")
ws_server = cli.SwitchAttr(['-w'], str, default = "127.0.0.1:8080", help = "Websocket server host[:port]")
iteration = cli.SwitchAttr(['--iterations'], int, default = 10, help = "Number of test results probing iterations")
remotes = cli.SwitchAttr(['--remote'], str, list = True, default = [],
help = "remote user@address of the remote machine to run client process on")
@cli.switch('--python-path', str, help = "Alternative python interpitor path.")
def python_path(self, new_path):
global python_cmd
python_cmd = new_path
@cli.switch('--script-path', str, help = "Alternative worker script path")
def script_path(self, new_path):
global client_script_path
cleanups = []
def main(self):
del self.cleanups[:]
print ("hi this is geet main")
def cleanup(self, retcode):
self.cleanups.append(1)
print("geet cleaning up with rc = %s" % (retcode,))
@Geet.subcommand("add")
class GeetAdd(cli.Application):
def main(self, *files):
return "adding", files
@Geet.subcommand("commit")
class GeetCommit(cli.Application):
message = cli.Flag("-m", str)
def main(self):
if self.parent.debug:
return "committing in debug"
else:
return "committing"
def cleanup(self, retcode):
self.parent.cleanups.append(2)
print("geet commit cleaning up with rc = %s" % (retcode,))
class Sample(cli.Application):
DESCRIPTION = "A sample cli application"
DESCRIPTION_MORE = '''
ABC This is just a sample help text typed with a Dvorak keyboard.
@Geet.subcommand("commit")
class GeetCommit(cli.Application):
message = cli.Flag("-m", str)
def main(self):
if self.parent.debug:
return "committing in debug"
else:
return "committing"
def cleanup(self, retcode):
self.parent.cleanups.append(2)
print("geet commit cleaning up with rc = %s" % (retcode,))
class Sample(cli.Application):
DESCRIPTION = "A sample cli application"
DESCRIPTION_MORE = '''
ABC This is just a sample help text typed with a Dvorak keyboard.
Although this paragraph is not left or right justified
in source, we expect it to appear
formatted nicely on the output, maintaining the indentation of the first line.
DEF this one has a different indentation.
Let's test that list items are not combined as paragraphs.
- Item 1
GHI more text for item 1, which may be very very very very very very long and even more long and long and long to
prove that we can actually wrap list items as well.
- Item 2 and this is
some text for item 2
ws_log_file, conf_template, proxy_port, \
workers
import os
import time
mkdir_cmd = local["mkdir"]["-p"]
def getLinkFilename(link):
return link.split("#")[0].split("/")[-1]
def getLinkDir(link):
if "#" in link:
return os.path.join(ngx_dir, link.split("#")[1])
return os.path.join(ngx_dir, getLinkFilename(link).replace(".tar.gz", ""))
if local.cwd.split("/")[-1]!= "ngx_http_websocket_stat_module":
print("this script is supposed to be run from repo root dir")
exit(1)
this_dir = "../.."
wget_cmd = local["wget"]
untar_cmd = local["tar"]["xz", "-C", ngx_dir, "-f"]
files_cmd = local["ls"][download_dir]
make_cmd = local["make"]["-j4"]
rm_cmd = local["rm"]["-rf"]
nginx_dir = getLinkDir(links["nginx"])
nginx_cmd = local[os.path.join(nginx_dir, "objs/nginx")]["-p", ngx_dir, "-c", conf_file]
def download(links):
mkdir_cmd(download_dir)
for lib in links:
temp = setup_temp_dir()
# Generate IP addresses to be used by the nodes we will create.
ips = generate_ips()
# Create a standalone Redis node.
standalone_redis_ip = next(ips)
standalone_redis = RedisNode(standalone_redis_ip, REDIS_PORT)
# Create a Dynomite cluster.
dynomite_cluster = DynoCluster(args.request_file, ips)
with ExitStack() as stack:
# Make sure to change the working directory to the temp dir before running the
# tests.
stack.enter_context(local.cwd(temp))
# Launch the standalone Redis node and the dynomite cluster.
stack.enter_context(standalone_redis)
stack.enter_context(dynomite_cluster)
# Wait for a while for the above nodes to start.
sleep_with_animation(SETTLE_TIME, "Waiting for cluster to start")
# Run all the functional comparison tests.
comparison_test(standalone_redis, dynomite_cluster, False)
random_node = random.choice(dynomite_cluster.nodes)
stats_url = 'http://{}:{}/info'.format(random_node.ip, STATS_PORT)
json.loads(urlopen(stats_url).read().decode('ascii'))
def test_home(self):
mypath = local.env.home / 'some_simple_home_rc.ini'
assert not mypath.exists()
try:
with Config('~/some_simple_home_rc.ini') as conf:
conf['a'] = 'b'
assert mypath.exists()
mypath.unlink()
with Config(mypath) as conf:
conf['a'] = 'b'
assert mypath.exists()
mypath.unlink()
finally:
mypath.unlink()
def test_parallel(self):
m = Cluster(local, local)
import time
t = time.time()
ret = m["sleep"]("2")
assert len(ret) == 2
assert 2 <= time.time() - t < 4