Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
backup_manager.get_next_backup = MagicMock()
backup_manager.get_next_backup.side_effect = \
UnknownBackupIdException()
# BackupInfo mock
backup_info = MagicMock(name='backup_info')
backup_info.get_basebackup_directory.return_value = 'backup_directory'
backup_info.backup_id = '123456789XYZ'
backup_info.error = None
backup_info.status = 'OK'
# Command mock executed by HookScriptRunner
command_mock.return_value.return_value = 0
# the actual test
script = HookScriptRunner(backup_manager, 'recovery_script', 'pre')
script.env_from_recover(
backup_info,
dest='fake_dest',
tablespaces={
'first': '/first/relocated',
'second': '/another/location',
},
remote_command='ssh user@host',
target_name='name',
exclusive=True,
)
expected_env = {
'BARMAN_PHASE': 'pre',
'BARMAN_VERSION': version,
'BARMAN_SERVER': 'test_server',
'BARMAN_CONFIGURATION': 'build_config_from_dicts',
backup_manager = build_backup_manager(name='test_server')
backup_manager.config.post_test_hook = 'not_existent_script'
backup_manager.get_previous_backup = MagicMock()
backup_manager.get_previous_backup.return_value = None
backup_manager.get_next_backup = MagicMock()
backup_manager.get_next_backup.return_value = None
# BackupInfo mock
backup_info = MagicMock(name='backup_info')
backup_info.get_basebackup_directory.return_value = 'backup_directory'
backup_info.backup_id = '123456789XYZ'
backup_info.error = 'Test error'
backup_info.status = 'FAILED'
# the actual test
script = HookScriptRunner(backup_manager, 'test_hook', 'post')
script.env_from_backup_info(backup_info)
expected_env = {
'BARMAN_PHASE': 'post',
'BARMAN_VERSION': version,
'BARMAN_SERVER': 'test_server',
'BARMAN_CONFIGURATION': 'build_config_from_dicts',
'BARMAN_HOOK': 'test_hook',
'BARMAN_BACKUP_DIR': 'backup_directory',
'BARMAN_BACKUP_ID': '123456789XYZ',
'BARMAN_ERROR': 'Test error',
'BARMAN_STATUS': 'FAILED',
'BARMAN_PREVIOUS_ID': '',
'BARMAN_RETRY': '0',
'BARMAN_NEXT_ID': '',
}
script.run()
def test_general_no_phase(self, command_mock):
# BackupManager mock
backup_manager = build_backup_manager(name='test_server')
backup_manager.config.test_hook = 'not_existent_script'
# Command mock executed by HookScriptRunner
command_mock.return_value.return_value = 0
# the actual test
script = HookScriptRunner(backup_manager, 'test_hook')
expected_env = {
'BARMAN_VERSION': version,
'BARMAN_SERVER': 'test_server',
'BARMAN_CONFIGURATION': 'build_config_from_dicts',
'BARMAN_HOOK': 'test_hook',
'BARMAN_RETRY': '0',
}
assert script.run() == 0
assert command_mock.call_count == 1
assert command_mock.call_args[1]['env_append'] == expected_env
def test_wal_info_corner_cases(self, command_mock):
# BackupManager mock
backup_manager = build_backup_manager(name='test_server')
backup_manager.config.pre_test_hook = 'not_existent_script'
# WalFileInfo mock
timestamp = time.time()
wal_info = MagicMock(name='wal_info')
wal_info.name = 'XXYYZZAABBCC'
wal_info.size = 1234567
wal_info.time = timestamp
wal_info.compression = None
wal_info.fullpath.return_value = '/incoming/directory'
# the actual test
script = HookScriptRunner(backup_manager, 'test_hook', 'pre')
script.env_from_wal_info(wal_info, '/somewhere', Exception('BOOM!'))
expected_env = {
'BARMAN_PHASE': 'pre',
'BARMAN_VERSION': version,
'BARMAN_SERVER': 'test_server',
'BARMAN_CONFIGURATION': 'build_config_from_dicts',
'BARMAN_HOOK': 'test_hook',
'BARMAN_SEGMENT': 'XXYYZZAABBCC',
'BARMAN_FILE': '/somewhere',
'BARMAN_SIZE': '1234567',
'BARMAN_TIMESTAMP': str(timestamp),
'BARMAN_COMPRESSION': '',
'BARMAN_RETRY': '0',
'BARMAN_ERROR': 'BOOM!',
}
script.run()
def test_no_exception(self, command_mock):
# BackupManager mock
backup_manager = build_backup_manager(name='test_server')
backup_manager.config.pre_test_hook = 'not_existent_script'
# Command mock executed by HookScriptRunner
expected_exception = Exception('Test error')
command_mock.side_effect = expected_exception
command_mock.return_value.return_value = 0
# the actual test
script = HookScriptRunner(backup_manager, 'test_hook', 'pre')
assert script.run() is None # exception
assert script.exception == expected_exception
assert command_mock.call_count == 1
if backup.status == BackupInfo.DONE and \
minimum_redundancy >= len(available_backups):
output.warning("Skipping delete of backup %s for server %s "
"due to minimum redundancy requirements "
"(minimum redundancy = %s, "
"current redundancy = %s)",
backup.backup_id,
self.config.name,
minimum_redundancy,
len(available_backups))
return False
# Keep track of when the delete operation started.
delete_start_time = datetime.datetime.now()
# Run the pre_delete_script if present.
script = HookScriptRunner(self, 'delete_script', 'pre')
script.env_from_backup_info(backup)
script.run()
# Run the pre_delete_retry_script if present.
retry_script = RetryHookScriptRunner(
self, 'delete_retry_script', 'pre')
retry_script.env_from_backup_info(backup)
retry_script.run()
output.info("Deleting backup %s for server %s",
backup.backup_id, self.config.name)
previous_backup = self.get_previous_backup(backup.backup_id)
next_backup = self.get_next_backup(backup.backup_id)
# Delete all the data contained in the backup
try:
self.delete_backup_data(backup)
# Run the post_delete_retry_script if present.
try:
retry_script = RetryHookScriptRunner(
self, 'delete_retry_script', 'post')
retry_script.env_from_backup_info(backup)
retry_script.run()
except AbortedRetryHookScript as e:
# Ignore the ABORT_STOP as it is a post-hook operation
_logger.warning("Ignoring stop request after receiving "
"abort (exit code %d) from post-delete "
"retry hook script: %s",
e.hook.exit_status, e.hook.script)
# Run the post_delete_script if present.
script = HookScriptRunner(self, 'delete_script', 'post')
script.env_from_backup_info(backup)
script.run()
return True
def delete_wal(self, wal_info):
"""
Delete a WAL segment, with the given WalFileInfo
:param barman.infofile.WalFileInfo wal_info: the WAL to delete
"""
# Run the pre_wal_delete_script if present.
script = HookScriptRunner(self, 'wal_delete_script', 'pre')
script.env_from_wal_info(wal_info)
script.run()
# Run the pre_wal_delete_retry_script if present.
retry_script = RetryHookScriptRunner(
self, 'wal_delete_retry_script', 'pre')
retry_script.env_from_wal_info(wal_info)
retry_script.run()
error = None
try:
os.unlink(wal_info.fullpath(self.server))
try:
os.removedirs(os.path.dirname(wal_info.fullpath(self.server)))
except OSError:
# This is not an error condition
# Run the post_wal_delete_retry_script if present.
try:
retry_script = RetryHookScriptRunner(
self, 'wal_delete_retry_script', 'post')
retry_script.env_from_wal_info(wal_info, None, error)
retry_script.run()
except AbortedRetryHookScript as e:
# Ignore the ABORT_STOP as it is a post-hook operation
_logger.warning("Ignoring stop request after receiving "
"abort (exit code %d) from post-wal-delete "
"retry hook script: %s",
e.hook.exit_status, e.hook.script)
# Run the post_wal_delete_script if present.
script = HookScriptRunner(self, 'wal_delete_script', 'post')
script.env_from_wal_info(wal_info, None, error)
script.run()