Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_infer_from_hadoop_bin_realpath(self):
with patch('posixpath.realpath', return_value='/ha/do/op/bin'):
self.runner = HadoopJobRunner(hadoop_bin=['/usr/bin/hadoop'])
self.mock_paths.append('/ha/do/op/hadoop-streaming.jar')
self.assertEqual(self.runner._find_hadoop_streaming_jar(),
'/ha/do/op/hadoop-streaming.jar')
def test_dont_infer_from_usr_local_bin_hadoop(self):
self.runner = HadoopJobRunner(hadoop_bin=['/usr/local/bin/hadoop'])
self.mock_paths.append('/usr/local/hadoop-streaming.jar')
self.assertEqual(self.runner._find_hadoop_streaming_jar(), None)
# environment variables, such as $HADOOP_PREFIX, and also guesses
# based on the path of the Hadoop binary
self.mock_hadoop_dirs = []
def mock_get_hadoop_version():
return self.mock_hadoop_version
def mock_hadoop_dirs_method():
return (d for d in self.mock_hadoop_dirs)
self.start(patch('mrjob.hadoop.HadoopJobRunner.get_hadoop_version',
side_effect=mock_get_hadoop_version))
self.start(patch('mrjob.hadoop.HadoopJobRunner._hadoop_dirs',
side_effect=mock_hadoop_dirs_method))
self.runner = HadoopJobRunner()
def test_dont_infer_from_bin_hadoop(self):
self.runner = HadoopJobRunner(hadoop_bin=['/bin/hadoop'])
self.mock_paths.append('/hadoop-streaming.jar')
self.assertEqual(self.runner._find_hadoop_streaming_jar(), None)
def test_hadoop_runner_cluster_mode(self):
runner = HadoopJobRunner(spark_deploy_mode='cluster')
self.assertEqual(runner._logs_needed_to_pick_error('streaming'),
('step', 'history', 'task'))
self.assertEqual(runner._logs_needed_to_pick_error('spark'),
('step', 'task'))
def setUp(self):
super(StreamingArgsTestCase, self).setUp()
self.runner = HadoopJobRunner(
hadoop_bin='hadoop', hadoop_streaming_jar='',
mr_job_script='my_job.py', stdin=BytesIO())
self.runner._add_job_files_for_upload()
self.start(patch.object(self.runner, '_upload_args',
return_value=['']))
self.start(patch.object(self.runner, '_hadoop_args_for_step',
return_value=['']))
self.start(patch.object(self.runner, '_step_input_uris',
return_value=['']))
self.start(patch.object(self.runner, '_step_output_uri',
return_value=''))
self.start(patch.object(HadoopFilesystem, 'get_hadoop_version',
return_value='2.7.1'))
self.runner._script_path = 'my_job.py'
def test_get_hadoop_version(self):
runner = HadoopJobRunner()
self.assertEqual(runner.get_hadoop_version(), '1.2.0')
def test_dont_infer_from_bin_hadoop(self):
self.runner = HadoopJobRunner(hadoop_bin=['/bin/hadoop'])
self.mock_paths.append('/hadoop-streaming.jar')
self.assertEqual(self.runner._find_hadoop_streaming_jar(), None)
def _default_opts(cls):
return combine_dicts(
super(HadoopJobRunner, cls)._default_opts(),
dict(
hadoop_tmp_dir='tmp/mrjob',
)
"""Make a runner based on command-line arguments, so we can
launch this job on EMR, on Hadoop, or locally.
:rtype: :py:class:`mrjob.runner.MRJobRunner`
"""
# have to import here so that we can still run the MRJob
# without importing boto
from mrjob.emr import EMRJobRunner
from mrjob.hadoop import HadoopJobRunner
from mrjob.local import LocalMRJobRunner
if self.options.runner == 'emr':
return EMRJobRunner(**self.emr_job_runner_kwargs())
elif self.options.runner == 'hadoop':
return HadoopJobRunner(**self.hadoop_job_runner_kwargs())
elif self.options.runner == 'inline':
raise ValueError("inline is not supported in the multi-lingual"
" launcher.")
else:
# run locally by default
return LocalMRJobRunner(**self.local_job_runner_kwargs())