Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_task_health_failed(self):
proxy_driver = ProxyDriver()
with SignalServer(UnhealthyHandler) as port:
with temporary_dir() as checkpoint_root:
health_check_config = HealthCheckConfig(initial_interval_secs=0.1, interval_secs=0.1)
_, executor = make_executor(proxy_driver,
checkpoint_root,
MESOS_JOB(task=SLEEP60, health_check_config=health_check_config),
ports={'health': port},
fast_status=True,
status_providers=(HealthCheckerProvider(),))
executor.terminated.wait()
updates = proxy_driver.method_calls['sendStatusUpdate']
assert len(updates) == 3
assert updates[-1][0][0].state == mesos_pb.TASK_FAILED
def test_on_context_exit(self):
with temporary_dir() as root_dir:
parse_context = ParseContext(create_buildfile(root_dir, 'a'))
with pytest.raises(parse_context.ContextError):
parse_context.on_context_exit(lambda: 37)
with temporary_dir() as root_dir:
buildfile = create_buildfile(root_dir, 'a',
content=dedent("""
import os
from pants.base.parse_context import ParseContext
def leave_a_trail(file, contents=''):
with open(file, 'w') as b:
b.write(contents)
b_file = os.path.join(os.path.dirname(__file__), 'b')
ParseContext.locate().on_context_exit(leave_a_trail, b_file, contents='42')
assert not os.path.exists(b_file), 'Expected context exit action to be delayed.'
""").strip()
)
b_file = os.path.join(root_dir, 'a', 'b')
self.assertFalse(os.path.exists(b_file))
ParseContext(buildfile).parse()
with open(b_file, 'r') as b:
def test_build_files_scan_with_non_default_relpath_ignore(self):
self.assertEqual(OrderedSet([
self.create_buildfile('grandparent/parent/BUILD'),
self.create_buildfile('grandparent/parent/BUILD.twitter'),
self.create_buildfile('grandparent/parent/child2/child3/BUILD'),
self.create_buildfile('grandparent/parent/child5/BUILD'),
]), self.scan_buildfiles('grandparent/parent', build_ignore_patterns=['**/parent/child1']))
).with_binaries(
foo_binary = pants(':foo_bin')
)
)
foo_bin = python_binary(
name = 'foo_bin',
entry_point = 'foo.bin.foo',
dependencies = [ pants(':foo_bin_dep') ]
)
foo_bin_dep = python_library(
name = 'foo_bin_dep'
)
assert SetupPy.minified_dependencies(foo) == OrderedSet([foo_bin, foo_bin_dep])
entry_points = dict(SetupPy.iter_entry_points(foo))
assert entry_points == {'foo_binary': 'foo.bin.foo'}
with self.run_execute(foo, recursive=False) as setup_py_command:
setup_py_command.run_one.assert_called_with(foo)
with self.run_execute(foo, recursive=True) as setup_py_command:
setup_py_command.run_one.assert_called_with(foo)
def collector(dep):
return OrderedSet([dep])
def test_get_error_permanent(self):
self.requests.get('http://foo', stream=True, timeout=60).AndRaise(requests.TooManyRedirects)
self.mox.ReplayAll()
with pytest.raises(self.fetcher.PermanentError) as e:
self.fetcher.fetch('http://foo',
self.listener,
chunk_size=Amount(1, Data.KB),
timeout=Amount(1, Time.MINUTES))
self.assertTrue(e.value.response_code is None)
for chunk in self.expect_get('http://baz', chunk_size_bytes=1, timeout_secs=37):
self.listener.recv_chunk(chunk)
digest.update(chunk)
self.listener.finished()
digest.hexdigest().AndReturn('42')
self.response.close()
self.mox.ReplayAll()
checksum_listener = Fetcher.ChecksumListener(digest=digest)
self.fetcher.fetch('http://baz',
checksum_listener.wrap(self.listener),
chunk_size=Amount(1, Data.BYTES),
timeout=Amount(37, Time.SECONDS))
self.assertEqual('42', checksum_listener.checksum)
self.response.status_code = 200
self.response.headers = {'content-length': '11'}
self.listener.status(200, content_length=11)
self.response.iter_content(chunk_size=1024).AndReturn(['a', 'b'])
self.listener.recv_chunk('a')
self.listener.recv_chunk('b')
self.response.close()
self.mox.ReplayAll()
with pytest.raises(self.fetcher.Error):
self.fetcher.fetch('http://foo',
self.listener,
chunk_size=Amount(1, Data.KB),
timeout=Amount(1, Time.MINUTES))
def test_terminal_status_update(self):
"""Launcher reacts to terminated task by launching a new one."""
self._cluster.num_nodes = 1
launcher = MySQLClusterLauncher(
self._driver,
self._cluster,
self._state_provider,
self._zk_url,
self._zk_client,
self._framework_user,
"./executor.pex",
"cmd.sh",
Amount(1, Time.SECONDS),
"/etc/mysos/admin_keyfile.yml",
self._scheduler_key)
self._launchers.append(launcher)
resources = create_resources(
cpus=DEFAULT_TASK_CPUS,
mem=DEFAULT_TASK_MEM,
disk=DEFAULT_TASK_DISK,
ports=set([10000]))
self._offer.resources.extend(resources)
task_id, _ = launcher.launch(self._offer)
assert task_id == "mysos-cluster0-0"
launched = self._driver.method_calls["launchTasks"]
assert len(launched) == self._cluster.num_nodes
def setup_task(self, task, root, finished=False, corrupt=False):
"""Set up the checkpoint stream for the given task in the given checkpoint root, optionally
finished and/or with a corrupt stream"""
class FastTaskRunner(TaskRunner):
COORDINATOR_INTERVAL_SLEEP = Amount(1, Time.MICROSECONDS)
tr = FastTaskRunner(
task=task,
checkpoint_root=root,
sandbox=os.path.join(root, 'sandbox', task.name().get()),
clock=ThreadedClock(time.time()))
with tr.control():
# initialize checkpoint stream
pass
if finished:
tr.kill()
if corrupt:
ckpt_file = TaskPath(root=root, tr=tr.task_id).getpath('runner_checkpoint')
with open(ckpt_file, 'w') as f:
f.write("definitely not a valid checkpoint stream")
return tr.task_id