Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_result(self):
# preparation
writer = self._mock_writer()
args = ('1', 'two')
kwargs = dict(three=3, four=5)
output.result('command', *args, **kwargs)
output.result('another_command')
# writer test
writer.result_command.assert_called_once_with(*args, **kwargs)
writer.result_another_command.assert_called_once_with()
def result(self, server_name, status, hint=None, check=None):
"""
Store the result of a check.
Log any check result (error or debug level).
Output the result to the user
:param str server_name: the server being checked
:param str check: the check name
:param bool status: True if succeeded
:param str,None hint: hint to print if not None:
"""
check = self._check_name(check)
super(CheckOutputStrategy, self).result(
server_name, status, hint, check)
# Send result to output
output.result('check', server_name, check, status, hint)
'pgespresso extension', "Available")
else:
output.result('status', self.config.name, 'pgespresso',
'pgespresso extension', "Not available")
if remote_status.get('current_size') is not None:
output.result('status', self.config.name,
'current_size',
'Current data size',
pretty_size(remote_status['current_size']))
if remote_status['data_directory']:
output.result('status', self.config.name,
"data_directory",
"PostgreSQL Data directory",
remote_status['data_directory'])
if remote_status['current_xlog']:
output.result('status', self.config.name,
"current_xlog",
"Current WAL segment",
remote_status['current_xlog'])
output.result('status', self.config.name,
"pg_version",
"PostgreSQL version",
remote_status['server_txt_version'])
else:
output.result('status', self.config.name,
"pg_version",
"PostgreSQL version",
"FAILED trying to get PostgreSQL version")
return
# Define the cluster state as pg_controldata do.
if remote_status['is_in_recovery']:
output.result('status', self.config.name, 'is_in_recovery',
'Cluster state', "in archive recovery")
else:
output.result('status', self.config.name, 'is_in_recovery',
'Cluster state', "in production")
if remote_status['pgespresso_installed']:
output.result('status', self.config.name, 'pgespresso',
'pgespresso extension', "Available")
else:
output.result('status', self.config.name, 'pgespresso',
'pgespresso extension', "Not available")
if remote_status.get('current_size') is not None:
output.result('status', self.config.name,
'current_size',
'Current data size',
pretty_size(remote_status['current_size']))
if remote_status['data_directory']:
output.result('status', self.config.name,
"data_directory",
"PostgreSQL Data directory",
# If PostgreSQL is >= 9.4 we have the last_archived_time
if last_wal and remote_status.get('last_archived_time'):
last_wal += ", at %s" % (
remote_status['last_archived_time'].ctime())
output.result('status', self.config.name,
"last_archived_wal",
"Last archived WAL",
last_wal or "No WAL segment shipped yet")
# Set output for WAL archive failures (PostgreSQL >= 9.4)
if remote_status.get('failed_count') is not None:
remote_fail = str(remote_status['failed_count'])
if int(remote_status['failed_count']) > 0:
remote_fail += " (%s at %s)" % (
remote_status['last_failed_wal'],
remote_status['last_failed_time'].ctime())
output.result('status', self.config.name, 'failed_count',
'Failures of WAL archiver', remote_fail)
# Add hourly archive rate if available (PostgreSQL >= 9.4) and > 0
if remote_status.get('current_archived_wals_per_second'):
output.result(
'status', self.config.name,
'server_archived_wals_per_hour',
'Server WAL archiving rate', '%0.2f/hour' % (
3600 * remote_status['current_archived_wals_per_second']))
self, 'backup_retry_script', 'post')
retry_script.env_from_backup_info(backup_info)
retry_script.run()
except AbortedRetryHookScript as e:
# Ignore the ABORT_STOP as it is a post-hook operation
_logger.warning("Ignoring stop request after receiving "
"abort (exit code %d) from post-backup "
"retry hook script: %s",
e.hook.exit_status, e.hook.script)
# Run the post-backup-script if present.
script = HookScriptRunner(self, 'backup_script', 'post')
script.env_from_backup_info(backup_info)
script.run()
output.result('backup', backup_info)
return backup_info
def status_postgres(self):
"""
Status of PostgreSQL server
"""
remote_status = self.get_remote_status()
if remote_status['server_txt_version']:
output.result('status', self.config.name,
"pg_version",
"PostgreSQL version",
remote_status['server_txt_version'])
else:
output.result('status', self.config.name,
"pg_version",
"PostgreSQL version",
"FAILED trying to get PostgreSQL version")
return
# Define the cluster state as pg_controldata do.
if remote_status['is_in_recovery']:
output.result('status', self.config.name, 'is_in_recovery',
'Cluster state', "in archive recovery")
else:
output.result('status', self.config.name, 'is_in_recovery',
'Cluster state', "in production")
if remote_status['pgespresso_installed']:
output.result('status', self.config.name, 'pgespresso',
'pgespresso extension', "Available")
else:
output.result('status', self.config.name, 'pgespresso',
remote_status['server_txt_version'])
else:
output.result('status', self.config.name,
"pg_version",
"PostgreSQL version",
"FAILED trying to get PostgreSQL version")
return
# Define the cluster state as pg_controldata do.
if remote_status['is_in_recovery']:
output.result('status', self.config.name, 'is_in_recovery',
'Cluster state', "in archive recovery")
else:
output.result('status', self.config.name, 'is_in_recovery',
'Cluster state', "in production")
if remote_status['pgespresso_installed']:
output.result('status', self.config.name, 'pgespresso',
'pgespresso extension', "Available")
else:
output.result('status', self.config.name, 'pgespresso',
'pgespresso extension', "Not available")
if remote_status.get('current_size') is not None:
output.result('status', self.config.name,
'current_size',
'Current data size',
pretty_size(remote_status['current_size']))
if remote_status['data_directory']:
output.result('status', self.config.name,
"data_directory",
"PostgreSQL Data directory",
remote_status['data_directory'])
if remote_status['current_xlog']:
output.result('status', self.config.name,
"""
remote_status = self.get_remote_status()
if remote_status['server_txt_version']:
output.result('status', self.config.name,
"pg_version",
"PostgreSQL version",
remote_status['server_txt_version'])
else:
output.result('status', self.config.name,
"pg_version",
"PostgreSQL version",
"FAILED trying to get PostgreSQL version")
return
# Define the cluster state as pg_controldata do.
if remote_status['is_in_recovery']:
output.result('status', self.config.name, 'is_in_recovery',
'Cluster state', "in archive recovery")
else:
output.result('status', self.config.name, 'is_in_recovery',
'Cluster state', "in production")
if remote_status['pgespresso_installed']:
output.result('status', self.config.name, 'pgespresso',
'pgespresso extension', "Available")
else:
output.result('status', self.config.name, 'pgespresso',
'pgespresso extension', "Not available")
if remote_status.get('current_size') is not None:
output.result('status', self.config.name,
'current_size',
'Current data size',
pretty_size(remote_status['current_size']))
if remote_status['data_directory']: