Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if self.task.action == 'setup' and 'ansible_facts' in results:
# Potentially sanitize some Ansible facts to prevent them from
# being saved both in the host facts and in the task results.
for fact in app.config['ARA_IGNORE_FACTS']:
if fact in results['ansible_facts']:
msg = "Not saved by ARA as configured by ARA_IGNORE_FACTS"
results['ansible_facts'][fact] = msg
values = jsonutils.dumps(results['ansible_facts'])
facts = models.HostFacts(values=values)
host.facts = facts
db.session.add(facts)
db.session.commit()
self.taskresult = models.TaskResult(
task=self.task,
host=host,
time_start=result.task_start,
time_end=result.task_end,
result=jsonutils.dumps(results),
status=status,
changed=result._result.get('changed', False),
failed=result._result.get('failed', False),
skipped=result._result.get('skipped', False),
unreachable=result._result.get('unreachable', False),
ignore_errors=ignore_errors,
)
db.session.add(self.taskresult)
db.session.commit()
# Use Ansible's CallbackBase._dump_results in order to strip internal
# keys, respect no_log directive, etc.
if self.loop_items:
# NOTE (dmsimard): There is a known issue in which Ansible can send
# callback hooks out of order and "exit" the task before all items
# have returned, this can cause one of the items to be missing
# from the task result in ARA.
# https://github.com/ansible/ansible/issues/24207
results = [self._dump_results(result._result)]
for item in self.loop_items:
results.append(self._dump_results(item._result))
results = jsonutils.loads(jsonutils.dumps(results))
else:
results = jsonutils.loads(self._dump_results(result._result))
self.taskresult = models.TaskResult(
task=self.task,
host=host,
time_start=result.task_start,
time_end=result.task_end,
result=jsonutils.dumps(results),
status=status,
changed=result._result.get('changed', False),
failed=result._result.get('failed', False),
skipped=result._result.get('skipped', False),
unreachable=result._result.get('unreachable', False),
ignore_errors=kwargs.get('ignore_errors', False),
)
db.session.add(self.taskresult)
db.session.commit()
def show_task(task):
task = models.Task.query.get(task)
if task is None:
abort(404)
task_results = (models.TaskResult.query
.join(models.Task)
.join(models.Host)
.join(models.Playbook)
.filter(models.Task.id == task.id)
.order_by(models.TaskResult.time_start))
if request.args.get('host'):
hosts = [str(host) for host in request.args.get('host').split(',')]
task_results = (task_results
.filter(models.Host.name.in_(hosts)))
if request.args.get('status'):
status = request.args.get('status').split(',')
task_results = (res for res in task_results
if res.derived_status in status)
def index():
"""
This is not served anywhere in the web application.
It is used explicitly in the context of generating static files since
flask-frozen requires url_for's to crawl content.
url_for's are not used with result.show_result directly and are instead
dynamically generated through javascript for performance purposes.
"""
if current_app.config['ARA_PLAYBOOK_OVERRIDE'] is not None:
override = current_app.config['ARA_PLAYBOOK_OVERRIDE']
results = (models.TaskResult.query
.join(models.Task)
.filter(models.Task.playbook_id.in_(override)))
else:
results = models.TaskResult.query.all()
return render_template('task_result_index.html', results=results)
def ajax_results(playbook):
task_results = (models.TaskResult.query
.join(models.Task)
.filter(models.Task.playbook_id.in_([playbook])))
if not utils.fast_count(task_results):
abort(404)
jinja = current_app.jinja_env
time = jinja.from_string('{{ time | timefmt }}')
action_link = jinja.get_template('ajax/action.html')
name_cell = jinja.get_template('ajax/task_name.html')
task_status_link = jinja.get_template('ajax/task_status.html')
results = dict()
results['data'] = list()
for result in task_results:
name = name_cell.render(result=result)
if current_app.config['ARA_PLAYBOOK_OVERRIDE'] is not None:
override = current_app.config['ARA_PLAYBOOK_OVERRIDE']
files = (models.File.query
.filter(models.File.playbook_id.in_(override)))
host_facts = (models.HostFacts.query
.join(models.Host)
.filter(models.Host.playbook_id.in_(override)))
hosts = (models.Host.query
.filter(models.Host.playbook_id.in_(override)))
playbooks = (models.Playbook.query
.filter(models.Playbook.id.in_(override)))
records = (models.Data.query
.filter(models.Data.playbook_id.in_(override)))
tasks = (models.Task.query
.filter(models.Task.playbook_id.in_(override)))
task_results = (models.TaskResult.query
.join(models.Task)
.filter(models.Task.playbook_id.in_(override)))
else:
files = models.File.query
host_facts = models.HostFacts.query
hosts = models.Host.query
playbooks = models.Playbook.query
records = models.Data.query
tasks = models.Task.query
task_results = models.TaskResult.query
return render_template('about.html',
active='about',
files=fast_count(files),
host_facts=fast_count(host_facts),
hosts=fast_count(hosts),
def take_action(self, args):
results = (models.TaskResult.query
.join(models.Task)
.join(models.Host)
.filter(models.TaskResult.task_id == models.Task.id)
.filter(models.TaskResult.host_id == models.Host.id)
.order_by(models.Task.time_start, models.Task.sortkey))
if args.playbook:
results = results.filter(models.Task.playbook_id == args.playbook)
elif args.play:
results = results.filter(models.Task.play_id == args.play)
elif args.task:
results = results.filter(models.TaskResult.task_id == args.task)
return [[field.name for field in LIST_FIELDS],
[[field(result) for field in LIST_FIELDS]
for result in results]]
def take_action(self, args):
# Setup where the output stream must go
if args.output_file == '-':
output_stream = sys.stdout
else:
output_stream = open(args.output_file, 'wb')
# Create the output stream
output = StreamResultToBytes(output_stream)
# Create the test run
output.startTestRun()
if args.playbook is not None:
playbooks = args.playbook
results = (models.TaskResult().query
.join(models.Task)
.filter(models.TaskResult.task_id == models.Task.id)
.filter(models.Task.playbook_id.in_(playbooks)))
else:
results = models.TaskResult().query.all()
for result in results:
# Generate a fixed length identifier for the task
test_id = utils.generate_identifier(result)
# Assign the test_status value
if result.status in ('failed', 'unreachable'):
if result.ignore_errors is False:
test_status = 'xfail'
else:
test_status = 'fail'
def take_action(self, args):
# Setup where the output stream must go
if args.output_file == '-':
output_stream = sys.stdout
else:
output_stream = open(args.output_file, 'wb')
# Create the output stream
output = StreamResultToBytes(output_stream)
# Create the test run
output.startTestRun()
if args.playbook is not None:
playbooks = args.playbook
results = (models.TaskResult().query
.join(models.Task)
.filter(models.TaskResult.task_id == models.Task.id)
.filter(models.Task.playbook_id.in_(playbooks)))
else:
results = models.TaskResult().query.all()
for result in results:
# Generate a fixed length identifier for the task
test_id = utils.generate_identifier(result)
# Assign the test_status value
if result.status in ('failed', 'unreachable'):
if result.ignore_errors is False:
test_status = 'xfail'
else:
test_status = 'fail'