Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_no_script_and_no_steps(self):
runner = InlineMRJobRunner()
self.assertEqual(runner._script_path, None)
self.assertEqual(runner._steps, [])
self.assertRaises(ValueError, runner.run)
self.assertFalse(self.log.warning.called)
def test_empty(self):
runner = InlineMRJobRunner(conf_paths=[])
match = _JOB_KEY_RE.match(runner.get_job_key())
self.assertEqual(match.group(1), 'no_script')
self.assertEqual(match.group(2), getpass.getuser())
def _test_cleanup_after_with_statement(self, mode, should_exist):
local_tmp_dir = None
with InlineMRJobRunner(cleanup=mode, conf_paths=[]) as runner:
local_tmp_dir = runner._get_local_tmp_dir()
self.assertTrue(os.path.exists(local_tmp_dir))
# leaving the with: block activates cleanup
self.assertEqual(os.path.exists(local_tmp_dir), should_exist)
def test_extra_kwargs_in_mrjob_conf_okay(self):
runner = InlineMRJobRunner(conf_paths=[self.path])
self.assertEqual(runner._opts['setup'], ['echo foo'])
self.assertNotIn('qux', runner._opts)
def test_option_debug_printout(self):
log = self.start(patch('mrjob.runner.log'))
InlineMRJobRunner(owner='dave')
debug = ''.join(a[0] + '\n' for a, kw in log.debug.call_args_list)
self.assertIn("'owner'", debug)
self.assertIn("'dave'", debug)
def test_missing_dir(self):
archive_path = os.path.join(self.tmp_dir, 'archive')
runner = InlineMRJobRunner()
self.assertRaises(OSError, runner._dir_archive_path, archive_path)
def test_nonexistent_dir(self):
runner = InlineMRJobRunner()
nonexistent_dir = os.path.join(self.tmp_dir, 'nonexistent')
self.assertRaises(
OSError, runner._create_dir_archive, nonexistent_dir)
def test_auto_owner(self):
os.environ['USER'] = 'mcp'
runner = InlineMRJobRunner(conf_paths=[])
match = _JOB_KEY_RE.match(runner.get_job_key())
self.assertEqual(match.group(1), 'no_script')
self.assertEqual(match.group(2), 'mcp')
def test_uri(self):
# we don't check whether URIs exist or are directories
runner = InlineMRJobRunner()
archive_path = runner._dir_archive_path('s3://bucket/stuff')
self.assertEqual(os.path.basename(archive_path), 'stuff.tar.gz')