Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _boto3_today():
now = _boto3_now()
return datetime(now.year, now.month, now.day, tzinfo=now.tzinfo)
def create_fake_clusters(self):
self.now = _boto3_now().replace(microsecond=0)
self.add_mock_s3_data({'my_bucket': {}})
# create a timestamp the given number of *hours*, *minutes*, etc.
# in the past
def ago(**kwargs):
return self.now - timedelta(**kwargs)
# Build a step object easily
# also make it respond to .args()
def step(jar='/home/hadoop/contrib/streaming/hadoop-streaming.jar',
args=self._DEFAULT_STEP_ARGS,
state='COMPLETED',
created=None,
started=None,
ended=None,
name='Streaming Step',
def put(self, Body):
if not isinstance(Body, bytes):
raise NotImplementedError('mock put() only support bytes')
mock_keys = self._mock_bucket_keys('PutObject')
if isinstance(Body, bytes):
data = Body
elif hasattr(Body, 'read'):
data = Body.read()
if not isinstance(data, bytes):
raise TypeError('Body or Body.read() must be bytes')
mock_keys[self.key] = dict(
body=data, time_modified=_boto3_now())
def _print_report(stats, now=None):
"""Print final report.
:param stats: a dictionary returned by :py:func:`_clusters_to_stats`
:param now: the current UTC time, as a :py:class:`datetime.datetime`.
Defaults to the current time.
"""
if now is None:
now = _boto3_now()
s = stats
if not s['clusters']:
print('No clusters created in the past two months!')
return
print('Total # of Clusters: %d' % len(s['clusters']))
print()
print('* All times are in UTC.')
print()
print('Min create time: %s' % min(cs['created'] for cs in s['clusters']))
print('Max create time: %s' % max(cs['created'] for cs in s['clusters']))
print(' Current time: %s' % now.replace(microsecond=0))
# we can open the ssh tunnel if cluster is ready (see #1115)
if cluster['Status']['State'] in ('RUNNING', 'WAITING'):
self._set_up_ssh_tunnel_and_hdfs()
log.info(' PENDING (cluster is %s%s)' % (
cluster['Status']['State'], reason_desc))
continue
elif step['Status']['State'] == 'RUNNING':
time_running_desc = ''
start = step['Status']['Timeline'].get('StartDateTime')
if start:
time_running_desc = ' for %s' % strip_microseconds(
_boto3_now() - start)
# now is the time to tunnel, if we haven't already
self._set_up_ssh_tunnel_and_hdfs()
log.info(' RUNNING%s' % time_running_desc)
# don't log progress for master node setup step, because
# it doesn't appear in job tracker
if step_num >= 0:
self._log_step_progress()
continue
# we're done, will return at the end of this
elif step['Status']['State'] == 'COMPLETED':
log.info(' COMPLETED')
# will fetch counters, below, and then return
pending longer than this
:param now: the current UTC time, as a :py:class:`datetime.datetime`.
Defaults to the current time.
For each job that is running or pending longer than *min_time*, yields
a dictionary with the following keys:
* *cluster_id*: the cluster's unique ID (e.g. ``j-SOMECLUSTER``)
* *name*: name of the step, or the cluster when bootstrapping
* *state*: state of the step (``'RUNNING'`` or ``'PENDING'``) or, if there
is no step, the cluster (``'STARTING'`` or ``'BOOTSTRAPPING'``)
* *time*: amount of time step was running or pending, as a
:py:class:`datetime.timedelta`
"""
if now is None:
now = _boto3_now()
for cs in cluster_summaries:
# special case for jobs that are taking a long time to bootstrap
if cs['Status']['State'] in ('STARTING', 'BOOTSTRAPPING'):
# there isn't a way to tell when the cluster stopped being
# provisioned and started bootstrapping, so just measure
# from cluster creation time
created = cs['Status']['Timeline']['CreationDateTime']
time_running = now - created
if time_running >= min_time:
yield({'cluster_id': cs['Id'],
'name': cs['Name'],
'state': cs['Status']['State'],
def main(args=None):
now = _boto3_now()
arg_parser = _make_arg_parser()
options = arg_parser.parse_args(args)
MRJob.set_up_logging(quiet=options.quiet, verbose=options.verbose)
log.info('getting information about running jobs')
min_time = timedelta(hours=options.min_hours)
emr_client = EMRJobRunner(**_runner_kwargs(options)).make_emr_client()
cluster_summaries = _boto3_paginate(
'Clusters', emr_client, 'list_clusters',
ClusterStates=['STARTING', 'BOOTSTRAPPING', 'RUNNING'])
if not options.exclude:
* *nih*: number of normalized instance hours cluster *would* use if it
ran to the end of the next full hour (
* *num_steps*: Number of steps in the cluster.
* *owner*: The owner for the cluster (usually the user that started it),
or ``None`` for non-:py:mod:`mrjob` clusters.
* *pool*: pool name (e.g. ``'default'``) if the cluster is pooled,
otherwise ``None``.
* *ran*: How long the cluster ran, or has been running, as a
:py:class:`datetime.timedelta`. This will be ``timedelta(0)`` if
the cluster hasn't started.
* *ready*: UTC `datetime.datetime` that the cluster finished
bootstrapping, or ``None``
* *state*: The cluster's state as a string (e.g. ``'RUNNING'``)
"""
if now is None:
now = _boto3_now()
bcs = {} # basic cluster summary to fill in
bcs['id'] = cluster['Id']
bcs['name'] = cluster['Name']
Status = cluster['Status']
Timeline = Status.get('Timeline', {})
bcs['created'] = Timeline.get('CreationDateTime')
bcs['ready'] = Timeline.get('ReadyDateTime')
bcs['end'] = Timeline.get('EndDateTime')
if bcs['created']:
bcs['ran'] = (bcs['end'] or now) - bcs['created']
else: