Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_debug_printout(self, opt_store_class, alias, opts):
stderr = StringIO()
with no_handlers_for_logger():
log_to_stream('mrjob.runner', stderr, debug=True)
# debug printout happens in constructor
opt_store_class(alias, opts, [])
return stderr.getvalue()
def maybe_terminate_quietly(self, stdout=None, **kwargs):
if 'conf_paths' not in kwargs:
kwargs['conf_paths'] = []
if 'now' not in kwargs:
kwargs['now'] = self.now
kwargs['cloud_tmp_dir'] = 's3://my_bucket/'
kwargs['cloud_fs_sync_secs'] = 0
kwargs['max_mins_locked'] = 1
# don't print anything out
real_stdout = sys.stdout
sys.stdout = stdout or StringIO()
try:
return _maybe_terminate_clusters(**kwargs)
finally:
sys.stdout = real_stdout
def test_cleanup_options(self):
stderr = StringIO()
with no_handlers_for_logger('mrjob.runner'):
log_to_stream('mrjob.runner', stderr)
opts = RunnerOptionStore(
'inline',
dict(cleanup=['LOCAL_SCRATCH', 'REMOTE_SCRATCH'],
cleanup_on_failure=['JOB_FLOW', 'SCRATCH']),
[])
self.assertEqual(opts['cleanup'], ['LOCAL_TMP', 'CLOUD_TMP'])
self.assertIn(
'Deprecated cleanup option LOCAL_SCRATCH has been renamed'
' to LOCAL_TMP', stderr.getvalue())
self.assertIn(
'Deprecated cleanup option REMOTE_SCRATCH has been renamed'
' to CLOUD_TMP', stderr.getvalue())
def test_exclude(self):
for cluster in CLUSTERS:
self.add_mock_emr_cluster(cluster)
main(['-q', '--no-conf', '-x', 'my_key,my_value'])
lines = [line for line in StringIO(self.stdout.getvalue())]
self.assertEqual(len(lines), len(CLUSTERS_BY_ID) - 2)
self.assertNotIn('j-COMPLETED', self.stdout.getvalue())
self.assertNotIn('j-RUNNING1STEP', self.stdout.getvalue())
def test_its_not_very_quiet(self):
stdout = StringIO()
self.maybe_terminate_quietly(stdout=stdout, max_mins_idle=0.6)
self.assertEqual(set(stdout.getvalue().splitlines()),
set(self.EXPECTED_STDOUT_LINES))
# should have actually terminated clusters
self.assertEqual(self.ids_of_terminated_clusters(), [
'j-CUSTOM_DONE_AND_IDLE',
'j-DEBUG_ONLY',
'j-DONE_AND_IDLE',
'j-DONE_AND_IDLE_4_X',
'j-HADOOP_DEBUGGING',
'j-IDLE_AND_EXPIRED',
'j-IDLE_AND_FAILED',
'j-PENDING_BUT_IDLE',
'j-POOLED',
def test_io_error(self):
self.mock_paths = [
IOError(),
]
with no_handlers_for_logger('mrjob.logs.ls'):
stderr = StringIO()
log_to_stream('mrjob.logs.ls', stderr)
self.assertEqual(list(_ls_logs(self.mock_fs, '/path/to/logs')), [])
self.mock_fs.ls.assert_called_once_with('/path/to/logs')
self.assertIn("couldn't ls() /path/to/logs", stderr.getvalue())
def setUp(self):
super(PrintHelpTestCase, self).setUp()
self.exit = self.start(patch('sys.exit'))
self.stdout = self.start(patch.object(sys, 'stdout', StringIO()))
def test_its_not_very_quiet(self):
stdout = StringIO()
self.maybe_terminate_quietly(
stdout=stdout, max_hours_idle=0.01)
self.assertEqual(set(stdout.getvalue().splitlines()),
set(self.EXPECTED_STDOUT_LINES))
# should have actually terminated clusters
self.assertEqual(self.ids_of_terminated_clusters(), [
'j-DEBUG_ONLY',
'j-DONE_AND_IDLE',
'j-DONE_AND_IDLE_4_X',
'j-HADOOP_DEBUGGING',
'j-IDLE_AND_EXPIRED',
'j-IDLE_AND_FAILED',
'j-PENDING_BUT_IDLE',
'j-POOLED',
def test_thread(self):
lines = StringIO(
'2015-08-22 00:46:18,411 INFO amazon.emr.metrics.MetricsSaver'
' (main): Thread 1 created MetricsLockFreeSaver 1\n')
self.assertEqual(
list(_parse_hadoop_log4j_records(lines)), [
dict(
caller_location='',
level='INFO',
logger='amazon.emr.metrics.MetricsSaver',
message='Thread 1 created MetricsLockFreeSaver 1',
num_lines=1,
start_line=0,
thread='main',
timestamp='2015-08-22 00:46:18,411',
)
def test_multiline_message(self):
lines = StringIO(
'2015-08-22 00:47:35,323 INFO org.apache.hadoop.mapreduce.Job'
' (main): Counters: 54\r\n'
' File System Counters\r\n'
' FILE: Number of bytes read=83\r\n')
self.assertEqual(
list(_parse_hadoop_log4j_records(lines)), [
dict(
caller_location='',
level='INFO',
logger='org.apache.hadoop.mapreduce.Job',
# strip \r's, no trailing \n
message=('Counters: 54\n'
' File System Counters\n'
' FILE: Number of bytes read=83'),
num_lines=3,