Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def setUp(self):
m.db.create_all()
self.playbook = m.Playbook(path='testing.yml')
self.play = m.Play(
name='test play',
playbook=self.playbook,
)
self.task = m.Task(
name='test task',
play=self.play,
playbook=self.playbook,
)
self.data = m.Data(
playbook=self.playbook,
key='test key',
value='test value'
def test_playbook(self):
playbooks = m.Playbook.query.all()
self.assertIn(self.playbook, playbooks)
def v2_playbook_on_start(self, playbook):
path = os.path.abspath(playbook._file_name)
if self._options is not None:
options = self._options.__dict__.copy()
else:
options = {}
# Potentially sanitize some user-specified keys
for parameter in app.config['ARA_IGNORE_PARAMETERS']:
if parameter in options:
msg = "Parameter not saved by ARA due to configuration"
options[parameter] = msg
LOG.debug('starting playbook %s', path)
self.playbook = models.Playbook(
ansible_version=ansible_version,
path=path,
options=options
)
self.playbook.start()
db.session.add(self.playbook)
db.session.commit()
file_ = self.get_or_create_file(path)
file_.is_playbook = True
# We need to persist the playbook id so it can be used by the modules
data = {
'playbook': {
'id': self.playbook.id
def v2_playbook_on_start(self, playbook):
path = os.path.abspath(playbook._file_name)
# Potentially sanitize some user-specified keys
for parameter in app.config['ARA_IGNORE_PARAMETERS']:
if parameter in cli_options:
msg = "Not saved by ARA as configured by ARA_IGNORE_PARAMETERS"
cli_options[parameter] = msg
log.debug('Starting playbook %s', path)
self.playbook = models.Playbook(
ansible_version=ansible_version,
path=path,
options=cli_options
)
self.playbook.start()
db.session.add(self.playbook)
db.session.commit()
file_ = self.get_or_create_file(path)
file_.is_playbook = True
# Cache the playbook data in memory for ara_record/ara_read
current_app._cache['playbook'] = self.playbook.id
def take_action(self, parsed_args):
stats = (models.Stats.query
.join(models.Playbook)
.join(models.Host)
.filter(models.Stats.playbook_id == models.Playbook.id)
.filter(models.Stats.host_id == models.Host.id)
.order_by(models.Playbook.time_start, models.Host.name))
return [[field.name for field in LIST_FIELDS],
[[field(stat) for field in LIST_FIELDS]
for stat in stats]]
def playbook_host(playbook, host, status=None):
host = models.Host.query.filter_by(name=host).one()
playbook = models.Playbook.query.get(playbook)
task_results = models.TaskResult.query
if status is not None:
status_query = utils.status_to_query(status)
task_results = task_results.filter_by(**status_query)
task_results = (task_results
.join(models.Task)
.join(models.Host)
.join(models.Playbook)
.filter(models.Playbook.id == playbook.id)
.filter(models.Host.name == host.name)
.order_by(models.TaskResult.time_start))
return render_template('playbook_host.html',
def take_action(self, args):
tasks = (models.Task.query
.join(models.Play)
.join(models.Playbook)
.filter(models.Task.play_id == models.Play.id)
.filter(models.Task.playbook_id == models.Playbook.id)
.order_by(models.Task.time_start, models.Task.sortkey))
if args.play:
tasks = tasks.filter(models.Task.play_id == args.play)
elif args.playbook:
tasks = tasks.filter(models.Task.playbook_id == args.playbook)
return [[field.name for field in LIST_FIELDS],
[[field(task) for field in LIST_FIELDS]
for task in tasks]]
def ctx_add_nav_data():
"""
Returns standard data that will be available in every template view.
"""
try:
models.Playbook.query.one()
empty_database = False
except models.MultipleResultsFound:
empty_database = False
except models.NoResultFound:
empty_database = True
# Get python version info
major, minor, micro, release, serial = sys.version_info
return dict(ara_version=ara_release,
ansible_version=ansible_version,
python_version="{0}.{1}".format(major, minor),
empty_database=empty_database)
def report_list(page=1):
if current_app.config['ARA_PLAYBOOK_OVERRIDE'] is not None:
override = current_app.config['ARA_PLAYBOOK_OVERRIDE']
playbooks = (models.Playbook.query
.filter(models.Playbook.id.in_(override))
.order_by(models.Playbook.time_start.desc()))
else:
playbooks = (models.Playbook.query
.order_by(models.Playbook.time_start.desc()))
if not utils.fast_count(playbooks):
return redirect(url_for('about.main'))
playbook_per_page = current_app.config['ARA_PLAYBOOK_PER_PAGE']
# Paginate unless playbook_per_page is set to 0
if playbook_per_page >= 1:
playbooks = playbooks.paginate(page, playbook_per_page, False)
else:
playbooks = playbooks.paginate(page, None, False)
stats = utils.get_summary_stats(playbooks.items, 'playbook_id')
result_per_page = current_app.config['ARA_RESULT_PER_PAGE']