Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_duplicate_host(self):
host = m.Host(
name='localhost',
playbook=self.playbook,
)
m.db.session.add(host)
with self.assertRaises(Exception):
m.db.session.commit()
"""
LOG.debug('logging stats')
hosts = sorted(stats.processed.keys())
for hostname in hosts:
host = self.get_or_create_host(hostname)
host_stats = stats.summarize(hostname)
db.session.add(models.Stats(
playbook=self.playbook,
host=host,
changed=host_stats['changed'],
unreachable=host_stats['unreachable'],
failed=host_stats['failures'],
ok=host_stats['ok'],
skipped=host_stats['skipped']
))
db.session.commit()
def close_play(self):
"""
Marks the completion time of the currently active play.
"""
if self.play is not None:
LOG.debug('closing play %s (%s)', self.play.name, self.play.id)
self.play.stop()
db.session.add(self.play)
db.session.commit()
self.play = None
def close_playbook(self):
"""
Marks the completion time of the currently active playbook.
"""
if self.playbook is not None:
log.debug('Closing playbook %s', self.playbook.path)
self.playbook.stop()
self.playbook.complete = True
db.session.add(self.playbook)
db.session.commit()
def get_or_create_file(self, path):
try:
if self.playbook.id:
file_ = (models.File.query
.filter_by(path=path)
.filter_by(playbook_id=self.playbook.id)
.one())
return file_
except models.NoResultFound:
pass
file_ = models.File(path=path, playbook=self.playbook)
db.session.add(file_)
db.session.commit()
try:
with open(path, 'r') as fd:
data = fd.read()
sha1 = models.content_sha1(data)
content = models.FileContent.query.get(sha1)
if content is None:
content = models.FileContent(content=data)
file_.content = content
except IOError:
LOG.warn('failed to open %s for reading', path)
return file_
def get_or_create_file(self, path):
try:
if self.playbook.id:
file_ = (models.File.query
.filter_by(path=path)
.filter_by(playbook_id=self.playbook.id)
.one())
return file_
except models.NoResultFound:
pass
file_ = models.File(path=path, playbook=self.playbook)
db.session.add(file_)
db.session.commit()
try:
with open(path, 'r') as fd:
data = fd.read()
sha1 = models.content_sha1(data)
content = models.FileContent.query.get(sha1)
if content is None:
content = models.FileContent(content=data)
file_.content = content
except IOError:
log.warning('failed to open %s for reading', path)
return file_
self.close_task()
self.close_play()
log.debug('Starting play %s', play.name)
if self.play is not None:
self.play.stop()
self.play = models.Play(
name=play.name,
sortkey=next(self.play_counter),
playbook=self.playbook
)
self.play.start()
db.session.add(self.play)
db.session.commit()
def get_or_create_host(self, hostname):
try:
host = (models.Host.query
.filter_by(name=hostname)
.filter_by(playbook_id=self.playbook.id)
.one())
except models.NoResultFound:
host = models.Host(name=hostname, playbook=self.playbook)
db.session.add(host)
db.session.commit()
return host