Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_get_error_permanent(self):
self.requests.get('http://foo', stream=True, timeout=60).AndRaise(requests.TooManyRedirects)
self.mox.ReplayAll()
with pytest.raises(self.fetcher.PermanentError) as e:
self.fetcher.fetch('http://foo',
self.listener,
chunk_size=Amount(1, Data.KB),
timeout=Amount(1, Time.MINUTES))
self.assertTrue(e.value.response_code is None)
for chunk in self.expect_get('http://baz', chunk_size_bytes=1, timeout_secs=37):
self.listener.recv_chunk(chunk)
digest.update(chunk)
self.listener.finished()
digest.hexdigest().AndReturn('42')
self.response.close()
self.mox.ReplayAll()
checksum_listener = Fetcher.ChecksumListener(digest=digest)
self.fetcher.fetch('http://baz',
checksum_listener.wrap(self.listener),
chunk_size=Amount(1, Data.BYTES),
timeout=Amount(37, Time.SECONDS))
self.assertEqual('42', checksum_listener.checksum)
self.response.status_code = 200
self.response.headers = {'content-length': '11'}
self.listener.status(200, content_length=11)
self.response.iter_content(chunk_size=1024).AndReturn(['a', 'b'])
self.listener.recv_chunk('a')
self.listener.recv_chunk('b')
self.response.close()
self.mox.ReplayAll()
with pytest.raises(self.fetcher.Error):
self.fetcher.fetch('http://foo',
self.listener,
chunk_size=Amount(1, Data.KB),
timeout=Amount(1, Time.MINUTES))
def test_terminal_status_update(self):
"""Launcher reacts to terminated task by launching a new one."""
self._cluster.num_nodes = 1
launcher = MySQLClusterLauncher(
self._driver,
self._cluster,
self._state_provider,
self._zk_url,
self._zk_client,
self._framework_user,
"./executor.pex",
"cmd.sh",
Amount(1, Time.SECONDS),
"/etc/mysos/admin_keyfile.yml",
self._scheduler_key)
self._launchers.append(launcher)
resources = create_resources(
cpus=DEFAULT_TASK_CPUS,
mem=DEFAULT_TASK_MEM,
disk=DEFAULT_TASK_DISK,
ports=set([10000]))
self._offer.resources.extend(resources)
task_id, _ = launcher.launch(self._offer)
assert task_id == "mysos-cluster0-0"
launched = self._driver.method_calls["launchTasks"]
assert len(launched) == self._cluster.num_nodes
def setup_task(self, task, root, finished=False, corrupt=False):
"""Set up the checkpoint stream for the given task in the given checkpoint root, optionally
finished and/or with a corrupt stream"""
class FastTaskRunner(TaskRunner):
COORDINATOR_INTERVAL_SLEEP = Amount(1, Time.MICROSECONDS)
tr = FastTaskRunner(
task=task,
checkpoint_root=root,
sandbox=os.path.join(root, 'sandbox', task.name().get()),
clock=ThreadedClock(time.time()))
with tr.control():
# initialize checkpoint stream
pass
if finished:
tr.kill()
if corrupt:
ckpt_file = TaskPath(root=root, tr=tr.task_id).getpath('runner_checkpoint')
with open(ckpt_file, 'w') as f:
f.write("definitely not a valid checkpoint stream")
return tr.task_id
return spawn_local(options.runner, jobname, config_file, **make_spawn_options(options))
def parse_package(option, _, value, parser):
if value is None:
return
splits = value.split(':')
if len(splits) != 3:
raise OptionValueError('Expected package to be of the form role:name:version')
setattr(parser.values, option.dest, tuple(splits))
RUNTASK_INSTANCE_LIMIT = 25
RUNTASK_CPU_LIMIT = 50
RUNTASK_RAM_LIMIT = Amount(50, Data.GB)
RUNTASK_DISK_LIMIT = Amount(500, Data.GB)
def task_is_expensive(options):
"""A metric to determine whether or not we require the user to double-take."""
if options.yes_i_really_want_to_run_an_expensive_job:
return False
errors = []
if options.instances > RUNTASK_INSTANCE_LIMIT:
errors.append('your task has more than %d instances (actual: %d)' % (
RUNTASK_INSTANCE_LIMIT, options.instances))
if options.instances * options.cpus > RUNTASK_CPU_LIMIT:
errors.append('aggregate CPU is over %.1f cores (actual: %.1f)' % (
RUNTASK_CPU_LIMIT, options.instances * options.cpus))
def deadline(closure, timeout=Amount(150, Time.MILLISECONDS), daemon=False, propagate=False):
"""Run a closure with a timeout, raising an exception if the timeout is exceeded.
args:
closure - function to be run (e.g. functools.partial, or lambda)
kwargs:
timeout - in seconds, or Amount of Time, [default: Amount(150, Time.MILLISECONDS]
daemon - booleanish indicating whether to daemonize the thread used to run the closure
(otherwise, a timed-out closure can potentially exist beyond the life of the
calling thread) [default: False]
propagate - booleanish indicating whether to re-raise exceptions thrown by the closure
[default: False]
"""
if isinstance(timeout, Compatibility.numeric):
pass
elif isinstance(timeout, Amount) and isinstance(timeout.unit(), Time):
timeout = timeout.as_(Time.SECONDS)
else:
raise ValueError('timeout must be either numeric or Amount of Time.')
q = Queue(maxsize=1)
class AnonymousThread(Thread):
def __init__(self):
super(AnonymousThread, self).__init__()
self.daemon = bool(daemon)
def run(self):
try:
result = closure()
except Exception as e:
if propagate:
result = e
else:
# conform to standard behaviour of an exception being raised inside a Thread
self._DEFAULT_LOGGER.mode().get(),
None,
None)
logger = process.logger()
if logger is Empty:
if self._process_logger_destination:
destination = self._process_logger_destination
if self._process_logger_mode:
mode = self._process_logger_mode
else:
destination = logger.destination().get()
mode = logger.mode().get()
if mode == LoggerMode.ROTATE:
size = Amount(self._DEFAULT_ROTATION.log_size().get(), Data.BYTES)
backups = self._DEFAULT_ROTATION.backups().get()
if logger is Empty:
if self._rotate_log_size_mb:
size = Amount(self._rotate_log_size_mb, Data.MB)
if self._rotate_log_backups:
backups = self._rotate_log_backups
else:
rotate = logger.rotate()
if rotate is not Empty:
size = Amount(rotate.log_size().get(), Data.BYTES)
backups = rotate.backups().get()
return destination, mode, size, backups
def run(self):
tasks = []
now = time.time()
# age: The time (in seconds) since the last task transition to/from ACTIVE/FINISHED
# metadata_size: The size of the thermos checkpoint records for this task
# log_size: The size of the stdout/stderr logs for this task's processes
# data_size: The size of the sandbox of this task.
TaskTuple = namedtuple('TaskTuple',
'checkpoint_root task_id age metadata_size log_size data_size')
for checkpoint_root, task_id in self.get_finished_tasks():
collector = TaskGarbageCollector(checkpoint_root, task_id)
age = Amount(int(now - collector.get_age()), Time.SECONDS)
self.log('Analyzing task %s (age: %s)... ' % (task_id, age))
metadata_size = Amount(sum(sz for _, sz in collector.get_metadata()), Data.BYTES)
self.log(' metadata %.1fKB ' % metadata_size.as_(Data.KB))
log_size = Amount(sum(sz for _, sz in collector.get_logs()), Data.BYTES)
self.log(' logs %.1fKB ' % log_size.as_(Data.KB))
data_size = Amount(sum(sz for _, sz in collector.get_data()), Data.BYTES)
self.log(' data %.1fMB ' % data_size.as_(Data.MB))
tasks.append(TaskTuple(checkpoint_root, task_id, age, metadata_size, log_size, data_size))
gc_tasks = set()
gc_tasks.update(task for task in tasks if task.age > self._max_age)
self.log('After age filter: %s tasks' % len(gc_tasks))
def total_gc_size(task):
return sum([task.data_size,
state. These operations do not require knowledge of the underlying
task.
TaskRunnerHelper is sort of a mishmash of "checkpoint-only" operations and
the "Process Platform" stuff that started to get pulled into process.py
This really needs some hard design thought to see if it can be extracted out
even further.
"""
class Error(Exception): pass
class PermissionError(Error): pass
# Maximum drift between when the system says a task was forked and when we checkpointed
# its fork_time (used as a heuristic to determine a forked task is really ours instead of
# a task with coincidentally the same PID but just wrapped around.)
MAX_START_TIME_DRIFT = Amount(10, Time.SECONDS)
@staticmethod
def get_actual_user():
import getpass, pwd
try:
pwd_entry = pwd.getpwuid(os.getuid())
except KeyError:
return getpass.getuser()
return pwd_entry[0]
@staticmethod
def process_from_name(task, process_name):
if task.has_processes():
for process in task.processes():
if process.name().get() == process_name:
return process