Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
region (string): The name of the zone to execute in.
days (int): The number of days we want history for.
task_id (string): The Id of the task.
request_id (string): The Id of the request we want tasks for.
user (string): The user of the request we want tasks for.
Returns:
task_stats(dict): Mapping of statistic names to values
"""
task_results = self.get_task_data(
instance, project, region, days, task_id, request_id, user)
if not task_results:
return {}
task_stats = {
'all_tasks': TurbiniaStats('All Tasks'),
'successful_tasks': TurbiniaStats('Successful Tasks'),
'failed_tasks': TurbiniaStats('Failed Tasks'),
'requests': TurbiniaStats('Total Request Time'),
# The following are dicts mapping the user/worker/type names to their
# respective TurbiniaStats() objects.
# Total wall-time for all tasks of a given type
'tasks_per_type': {},
# Total wall-time for all tasks per Worker
'tasks_per_worker': {},
# Total wall-time for all tasks per User
'tasks_per_user': {},
}
# map of request ids to [min time, max time]
requests = {}
task_stats['tasks_per_type'][task_type] = task_type_stats
task_type_stats.add_task(task)
# Stats per worker.
if worker in task_stats['tasks_per_worker']:
worker_stats = task_stats['tasks_per_worker'].get(worker)
else:
worker_stats = TurbiniaStats('Worker {0:s}'.format(worker))
task_stats['tasks_per_worker'][worker] = worker_stats
worker_stats.add_task(task)
# Stats per submitting User.
if user in task_stats['tasks_per_user']:
user_stats = task_stats['tasks_per_user'].get(user)
else:
user_stats = TurbiniaStats('User {0:s}'.format(user))
task_stats['tasks_per_user'][user] = user_stats
user_stats.add_task(task)
# Stats for the total request. This will, for each request, calculate the
# start time of the earliest task and the stop time of the latest task.
# This will give the overall run time covering all tasks in the request.
task_start_time = task['last_update'] - task['run_time']
task_stop_time = task['last_update']
if request_id in requests:
start_time, stop_time = requests[request_id]
if task_start_time < start_time:
requests[request_id][0] = task_start_time
if task_stop_time > stop_time:
requests[request_id][1] = task_stop_time
else:
requests[request_id] = [task_start_time, task_stop_time]
task_id (string): The Id of the task.
request_id (string): The Id of the request we want tasks for.
user (string): The user of the request we want tasks for.
Returns:
task_stats(dict): Mapping of statistic names to values
"""
task_results = self.get_task_data(
instance, project, region, days, task_id, request_id, user)
if not task_results:
return {}
task_stats = {
'all_tasks': TurbiniaStats('All Tasks'),
'successful_tasks': TurbiniaStats('Successful Tasks'),
'failed_tasks': TurbiniaStats('Failed Tasks'),
'requests': TurbiniaStats('Total Request Time'),
# The following are dicts mapping the user/worker/type names to their
# respective TurbiniaStats() objects.
# Total wall-time for all tasks of a given type
'tasks_per_type': {},
# Total wall-time for all tasks per Worker
'tasks_per_worker': {},
# Total wall-time for all tasks per User
'tasks_per_user': {},
}
# map of request ids to [min time, max time]
requests = {}
for task in task_results:
request_id = task.get('request_id')
request_id (string): The Id of the request we want tasks for.
user (string): The user of the request we want tasks for.
Returns:
task_stats(dict): Mapping of statistic names to values
"""
task_results = self.get_task_data(
instance, project, region, days, task_id, request_id, user)
if not task_results:
return {}
task_stats = {
'all_tasks': TurbiniaStats('All Tasks'),
'successful_tasks': TurbiniaStats('Successful Tasks'),
'failed_tasks': TurbiniaStats('Failed Tasks'),
'requests': TurbiniaStats('Total Request Time'),
# The following are dicts mapping the user/worker/type names to their
# respective TurbiniaStats() objects.
# Total wall-time for all tasks of a given type
'tasks_per_type': {},
# Total wall-time for all tasks per Worker
'tasks_per_worker': {},
# Total wall-time for all tasks per User
'tasks_per_user': {},
}
# map of request ids to [min time, max time]
requests = {}
for task in task_results:
request_id = task.get('request_id')
task_type = task.get('name')
'set, and it is required to calculate stats'.format(
task.get('name')))
continue
# Stats for all/successful/failed tasks
task_stats['all_tasks'].add_task(task)
if task.get('successful') is True:
task_stats['successful_tasks'].add_task(task)
elif task.get('successful') is False:
task_stats['failed_tasks'].add_task(task)
# Stats for Tasks per Task type.
if task_type in task_stats['tasks_per_type']:
task_type_stats = task_stats['tasks_per_type'].get(task_type)
else:
task_type_stats = TurbiniaStats('Task type {0:s}'.format(task_type))
task_stats['tasks_per_type'][task_type] = task_type_stats
task_type_stats.add_task(task)
# Stats per worker.
if worker in task_stats['tasks_per_worker']:
worker_stats = task_stats['tasks_per_worker'].get(worker)
else:
worker_stats = TurbiniaStats('Worker {0:s}'.format(worker))
task_stats['tasks_per_worker'][worker] = worker_stats
worker_stats.add_task(task)
# Stats per submitting User.
if user in task_stats['tasks_per_user']:
user_stats = task_stats['tasks_per_user'].get(user)
else:
user_stats = TurbiniaStats('User {0:s}'.format(user))
elif task.get('successful') is False:
task_stats['failed_tasks'].add_task(task)
# Stats for Tasks per Task type.
if task_type in task_stats['tasks_per_type']:
task_type_stats = task_stats['tasks_per_type'].get(task_type)
else:
task_type_stats = TurbiniaStats('Task type {0:s}'.format(task_type))
task_stats['tasks_per_type'][task_type] = task_type_stats
task_type_stats.add_task(task)
# Stats per worker.
if worker in task_stats['tasks_per_worker']:
worker_stats = task_stats['tasks_per_worker'].get(worker)
else:
worker_stats = TurbiniaStats('Worker {0:s}'.format(worker))
task_stats['tasks_per_worker'][worker] = worker_stats
worker_stats.add_task(task)
# Stats per submitting User.
if user in task_stats['tasks_per_user']:
user_stats = task_stats['tasks_per_user'].get(user)
else:
user_stats = TurbiniaStats('User {0:s}'.format(user))
task_stats['tasks_per_user'][user] = user_stats
user_stats.add_task(task)
# Stats for the total request. This will, for each request, calculate the
# start time of the earliest task and the stop time of the latest task.
# This will give the overall run time covering all tasks in the request.
task_start_time = task['last_update'] - task['run_time']
task_stop_time = task['last_update']
days (int): The number of days we want history for.
task_id (string): The Id of the task.
request_id (string): The Id of the request we want tasks for.
user (string): The user of the request we want tasks for.
Returns:
task_stats(dict): Mapping of statistic names to values
"""
task_results = self.get_task_data(
instance, project, region, days, task_id, request_id, user)
if not task_results:
return {}
task_stats = {
'all_tasks': TurbiniaStats('All Tasks'),
'successful_tasks': TurbiniaStats('Successful Tasks'),
'failed_tasks': TurbiniaStats('Failed Tasks'),
'requests': TurbiniaStats('Total Request Time'),
# The following are dicts mapping the user/worker/type names to their
# respective TurbiniaStats() objects.
# Total wall-time for all tasks of a given type
'tasks_per_type': {},
# Total wall-time for all tasks per Worker
'tasks_per_worker': {},
# Total wall-time for all tasks per User
'tasks_per_user': {},
}
# map of request ids to [min time, max time]
requests = {}
for task in task_results: