Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
:type mem_per_process: int
:param max_processes: optional maximum number of processes per test node
:type max_processes: int
:param rsync_max_processes: optional maximum number of rsync processes
:type rsync_max_processes: int
:param rsync_bandwidth_limit: optional bandwidth limit per rsync process in kilobytes per second
:type rsync_bandwidth_limit: int
:param config: pytest config object
:type config: pytest.Config
:return: `list` of test gateway specs for all test nodes which confirm given requirements
in form ['1*ssh=//id=:', ...]
:rtype: list
"""
# pylint: disable=E1101
group = execnet.Group()
try:
if virtualenv_path:
n_m = NodeManager(config, specs=[])
virtualenv_path = os.path.relpath(virtualenv_path)
node_specs = []
node_caps = {}
root_dir = config.rootdir
nodes = list(unique_everseen(nodes))
print('Detected root dir: {0}'.format(root_dir))
rsync = RSync(
root_dir, chdir, includes=config.getini("rsyncdirs"),
jobs=rsync_max_processes or len(nodes),
bwlimit=rsync_bandwidth_limit,
bandwidth_limit=rsync_bandwidth_limit,
ssh_cipher=rsync_cipher,
**n_m.rsyncoptions)
def test_multichannel_receive_queue_for_two_subprocesses(self):
gm = execnet.Group(["popen"] * 2)
mc = gm.remote_exec(
"""
import os
channel.send(os.getpid())
"""
)
queue = mc.make_receive_queue()
ch, item = queue.get(timeout=10)
ch2, item2 = queue.get(timeout=10)
assert ch != ch2
assert ch.gateway != ch2.gateway
assert item != item2
mc.waitclose()
# -*- coding: utf-8 -*-
import execnet
group = execnet.Group()
for i in range(4): # 4 CPUs
group.makegateway()
def process_item(channel):
# task processor, sits on each CPU
import time
import random
channel.send("ready")
for x in channel:
if x is None: # we can shutdown
break
# sleep random time, send result
time.sleep(random.randrange(3))
channel.send(x * 10)
def test_multichannel_send_each(self):
gm = execnet.Group(["popen"] * 2)
mc = gm.remote_exec(
"""
import os
channel.send(channel.receive() + 1)
"""
)
mc.send_each(41)
l = mc.receive_each()
assert l == [42, 42]
def __init__(self, config, specs=None, defaultchdir="pyexecnetcache"):
self.config = config
self.trace = self.config.trace.get("nodemanager")
self.testrunuid = self.config.getoption("testrunuid")
if self.testrunuid is None:
self.testrunuid = uuid.uuid4().hex
self.group = execnet.Group()
if specs is None:
specs = self._getxspecs()
self.specs = []
for spec in specs:
if not isinstance(spec, execnet.XSpec):
spec = execnet.XSpec(spec)
if not spec.chdir and not spec.popen:
spec.chdir = defaultchdir
self.group.allocate_id(spec)
self.specs.append(spec)
self.roots = self._getrsyncdirs()
self.rsyncoptions = self._getrsyncoptions()
self._rsynced_specs = set()
def test_socket_installvia(self):
group = execnet.Group()
group.makegateway("popen//id=p1")
gw = group.makegateway("socket//installvia=p1//id=s1")
assert gw.id == "s1"
assert gw.remote_status()
group.terminate()
self._scores=data['scores']
self._landmarks=data['landmarks']
self._binary_score=(self._scores>=0)
# mark 70% as confident
self._confident=numpy.zeros_like(self._scores,dtype=numpy.bool)
self._confident[self._scores>numpy.percentile(self._scores[self._scores>=0],30,axis=0)]=True
self._confident[self._scores
def _make_gateway(self, hostname):
self.group = execnet.Group()
gateway = self.group.makegateway(
self._make_connection_string(hostname)
)
gateway.reconfigure(py2str_as_py3str=False, py3str_as_py2str=False)
return gateway