Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_explicit_zone_beats_environment(self):
with save_current_environment():
os.environ['CLOUDSDK_COMPUTE_ZONE'] = 'us-west1-b'
runner = DataprocJobRunner(zone='europe-west1-a')
self.assertEqual(runner._opts['zone'], 'europe-west1-a')
def test_dont_take_down_cluster_on_failure(self):
runner1 = DataprocJobRunner(conf_paths=[])
runner1._launch_cluster()
cluster_id = runner1._cluster_id
mr_job = MRTwoStepJob(['-r', 'dataproc', '-v',
'--cluster-id', cluster_id])
mr_job.sandbox()
self.mock_jobs_succeed = False
with mr_job.make_runner() as runner2:
self.assertIsInstance(runner2, DataprocJobRunner)
self.assertRaises(StepFailedException, runner2.run)
cluster2 = runner2._get_cluster(runner2._cluster_id)
def _test_mode(self, mode):
r = DataprocJobRunner(conf_paths=[])
with patch.multiple(r,
_cleanup_cluster=mock.DEFAULT,
_cleanup_job=mock.DEFAULT,
_cleanup_local_tmp=mock.DEFAULT,
_cleanup_logs=mock.DEFAULT,
_cleanup_cloud_tmp=mock.DEFAULT) as mock_dict:
r.cleanup(mode=mode)
yield mock_dict
def assert_new_tmp_bucket(self, location, **runner_kwargs):
"""Assert that if we create an DataprocJobRunner with the given keyword
args, it'll create a new tmp bucket with the given location
constraint.
"""
existing_buckets = set(self.mock_gcs_fs)
runner = DataprocJobRunner(conf_paths=[], **runner_kwargs)
runner._upload_local_files()
bucket_name, path = parse_gcs_uri(runner._cloud_tmp_dir)
runner._upload_local_files()
self.assertTrue(bucket_name.startswith('mrjob-'))
self.assertNotIn(bucket_name, existing_buckets)
self.assertEqual(path, 'tmp/')
current_bucket = runner.fs.gcs.get_bucket(bucket_name)
self.assertEqual(current_bucket.location, location.upper())
# Verify that we setup bucket lifecycle rules of 28-day retention
first_lifecycle_rule = list(current_bucket.lifecycle_rules)[0]
self.assertEqual(first_lifecycle_rule['action'], dict(type='Delete'))
def test_zone_beats_region(self):
runner = DataprocJobRunner(region='europe-west1',
zone='europe-west1-a')
self.assertTrue(self.log.warning.called)
self.assertEqual(runner._opts['region'], None)
self.assertEqual(runner._opts['zone'], 'europe-west1-a')
def test_gcs_cat(self):
self.put_gcs_multi({
'gs://walrus/one': b'one_text',
'gs://walrus/two': b'two_text',
'gs://walrus/three': b'three_text',
})
runner = DataprocJobRunner(cloud_tmp_dir='gs://walrus/tmp',
conf_paths=[])
self.assertEqual(list(runner.fs.cat('gs://walrus/one')), [b'one_text'])
def test_bootstrap_mrjob_uses_python_bin(self):
# use all the bootstrap options
runner = DataprocJobRunner(conf_paths=[],
bootstrap_mrjob=True,
python_bin=['anaconda'])
runner._add_bootstrap_files_for_upload()
self.assertIsNotNone(runner._master_bootstrap_script_path)
with open(runner._master_bootstrap_script_path, 'r') as f:
content = f.read()
self.assertIn('sudo anaconda -m compileall -q -f', content)
def __init__(self, **kwargs):
""":py:class:`~mrjob.dataproc.DataprocJobRunner` takes the same
arguments as
:py:class:`~mrjob.runner.MRJobRunner`, plus some additional options
which can be defaulted in :ref:`mrjob.conf `.
"""
super(DataprocJobRunner, self).__init__(**kwargs)
# check for library support
if google is None:
raise ImportError(
'You must install google-cloud-logging and '
'google-cloud-storage to connect to Dataproc')
# Dataproc requires a master and >= 2 core instances
# num_core_instances refers ONLY to number of CORE instances and does
# NOT include the required 1 instance for master
# In other words, minimum cluster size is 3 machines, 1 master and 2
# "num_core_instances" workers
if self._opts['num_core_instances'] < _DATAPROC_MIN_WORKERS:
raise DataprocException(
'Dataproc expects at LEAST %d workers' % _DATAPROC_MIN_WORKERS)