Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
which only checks for preconfigured fields.
These checks include
- there is room for our job in the cluster (clusters top out at
256 steps)
- the cluster does not have a running step
:param emr_client: a boto3 EMR client. See
:py:meth:`~mrjob.emr.EMRJobRunner.make_emr_client`
:param cluster: EMR cluster dict to check if we are able to join.
:param num_steps: The number of steps this job requires.
:return: -1 on failure or num_steps_in_cluster on success
"""
steps = list(_boto3_paginate(
'Steps',
emr_client, 'list_steps', ClusterId=cluster['Id']))
if self._opts['release_label']:
max_steps = _4_X_MAX_STEPS
else:
image_version = cluster.get('RunningAmiVersion', '')
max_steps = map_version(image_version, _IMAGE_VERSION_TO_MAX_STEPS)
# don't add more steps than EMR will allow/display through the API
if len(steps) + num_steps > max_steps:
log.debug(' no room for our steps')
return -1
# in rare cases, cluster can be WAITING *and* have incomplete
# steps. We could just check for PENDING steps, but we're
emr_client = EMRJobRunner(**runner_kwargs).make_emr_client()
# if --max-days-ago is set, only look at recent jobs
created_after = None
if max_days_ago is not None:
created_after = now - timedelta(days=max_days_ago)
# use _DELAY to sleep 1 second after each API call (see #1091). Could
# implement some sort of connection wrapper for this if it becomes more
# generally useful.
list_clusters_kwargs = dict(_delay=_DELAY)
if created_after is not None:
list_clusters_kwargs['CreatedAfter'] = created_after
for cluster_summary in _boto3_paginate(
'Clusters', emr_client, 'list_clusters', **list_clusters_kwargs):
cluster_id = cluster_summary['Id']
cluster = emr_client.describe_cluster(ClusterId=cluster_id)['Cluster']
sleep(_DELAY)
cluster['Steps'] = list(reversed(list(_boto3_paginate(
'Steps', emr_client, 'list_steps',
ClusterId=cluster_id, _delay=_DELAY))))
yield cluster
def _get_job_steps(emr_client, cluster_id, job_key):
"""Efficiently fetch steps for a particular mrjob run from the EMR API.
:param emr_client: a boto3 EMR client. See
:py:meth:`~mrjob.emr.EMRJobRunner.make_emr_client`
:param cluster_id: ID of EMR cluster to fetch steps from. See
:py:meth:`~mrjob.emr.EMRJobRunner.get_cluster_id`
:param job_key: Unique key for a mrjob run. See
:py:meth:`~mrjob.runner.MRJobRunner.get_job_key`
"""
steps = []
for step in _boto3_paginate('Steps', emr_client, 'list_steps',
ClusterId=cluster_id):
if step['Name'].startswith(job_key):
steps.append(step)
elif steps:
# all steps for job will be together, so stop
# when we find a non-job step
break
return list(reversed(list(steps)))
again.
:param invalid_clusters: A set of clusters with an invalid setup; thus
we skip these clusters.
:param locked_clusters: A set of clusters managed by the callee that
are in a "locked" state.
:param num_steps: The number of steps this job requires.
:return: list of tuples of (cluster_id, num_steps_in_cluster)
"""
emr_client = self.make_emr_client()
req_pool_hash = self._pool_hash()
# list of (sort_key, cluster_id, num_steps)
key_cluster_steps_list = []
for cluster_summary in _boto3_paginate(
'Clusters', emr_client, 'list_clusters',
ClusterStates=['WAITING']):
cluster_id = cluster_summary['Id']
# this may be a retry due to locked clusters
if cluster_id in invalid_clusters or cluster_id in locked_clusters:
log.debug(' excluded')
continue
# if we haven't seen this cluster before then check the setup
cluster, instance_sort_key = valid_clusters.get(cluster_id,
(None, None,))
if cluster is None:
cluster = emr_client.describe_cluster(
ClusterId=cluster_id)['Cluster']
instance_sort_key = self._compare_cluster_setup(
if not matches:
log.debug(' subnet mismatch')
return
collection_type = cluster.get('InstanceCollectionType',
'INSTANCE_GROUP')
instance_sort_key = None
if self._opts['instance_fleets']:
if collection_type != 'INSTANCE_FLEET':
log.debug(' does not use instance fleets')
return
actual_fleets = list(_boto3_paginate(
'InstanceFleets', emr_client, 'list_instance_fleets',
ClusterId=cluster_id))
req_fleets = self._opts['instance_fleets']
instance_sort_key = _instance_fleets_satisfy(
actual_fleets, req_fleets)
else:
if collection_type != 'INSTANCE_GROUP':
log.debug(' does not use instance groups')
return
# check memory and compute units, bailing out if we hit
# an instance with too little memory
actual_igs = list(_boto3_paginate(
'InstanceGroups', emr_client, 'list_instance_groups',
if max_mins_idle is None:
max_mins_idle = _DEFAULT_MAX_MINS_IDLE
runner = EMRJobRunner(**kwargs)
emr_client = runner.make_emr_client()
num_starting = 0
num_bootstrapping = 0
num_done = 0
num_idle = 0
num_pending = 0
num_running = 0
# include RUNNING to catch clusters with PENDING jobs that
# never ran (see #365).
for cluster_summary in _boto3_paginate(
'Clusters', emr_client, 'list_clusters',
ClusterStates=['WAITING', 'RUNNING']):
cluster_id = cluster_summary['Id']
# check if cluster is done
if _is_cluster_done(cluster_summary):
num_done += 1
continue
# check if cluster is starting
if _is_cluster_starting(cluster_summary):
num_starting += 1
continue
# check if cluster is bootstrapping