Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
info_file = backup_dir.join('backup.info')
info_file.write(BASE_BACKUP_INFO)
b_info = LocalBackupInfo(server, backup_id="fake_backup_id")
# Check that the paths are built according with version
assert b_info.backup_version == 2
assert b_info.get_data_directory() == data_dir.strpath
assert b_info.get_data_directory(16384) == (
backup_dir.strpath + '/16384')
# Build a fake v1 backup
backup_dir = tmpdir.mkdir('another_fake_backup_id')
pgdata_dir = backup_dir.mkdir('pgdata')
info_file = backup_dir.join('backup.info')
info_file.write(BASE_BACKUP_INFO)
b_info = LocalBackupInfo(server, backup_id="another_fake_backup_id")
# Check that the paths are built according with version
assert b_info.backup_version == 1
assert b_info.get_data_directory(16384) == \
backup_dir.strpath + '/pgdata/pg_tblspc/16384'
assert b_info.get_data_directory() == pgdata_dir.strpath
# Check that an exception is raised if an invalid oid
# is provided to the method
with pytest.raises(ValueError):
b_info.get_data_directory(12345)
# Check that a ValueError exception is raised with an
# invalid oid when the tablespaces list is None
b_info.tablespaces = None
# and expect a value error
},
)
# Build a fake backup info and try to load id, to ensure that we won't
# need a PostgreSQL connection to do that
backup_dir = tmpdir.mkdir('fake_backup_id')
info_file = backup_dir.join('backup.info')
info_file.write(BASE_BACKUP_INFO)
# Monkey patch the PostgreSQL connection function to raise a
# RuntimeError
connect_mock.side_effect = RuntimeError
# The following constructor will raise a RuntimeError if we are
# needing a PostgreSQL connection
LocalBackupInfo(server, backup_id="fake_backup_id")
assert "Started copy of WAL files for server" in out
# Modify the response of the fake remote call
primary_info = dict(EXPECTED_MINIMAL)
primary_info['backups'] = []
primary_info['wals'] = []
command_mock.return_value.out = json.dumps(primary_info)
server.cron()
(out, err) = capsys.readouterr()
# Assertion block 2: No backup or wal synchronisation required
assert "No backup synchronisation required" in out
assert "No WAL synchronisation required for server" in out
# Add a backup to the remote response
primary_info = dict(EXPECTED_MINIMAL)
backup_info_dict = LocalBackupInfo(server,
backup_id='1234567891').to_json()
primary_info['backups']['1234567891'] = backup_info_dict
command_mock.return_value.out = json.dumps(primary_info)
server.cron()
(out, err) = capsys.readouterr()
# Assertion block 3: start the copy the first backup
# of the list (1234567890),
# and not the one second one (1234567891)
assert "Starting copy of backup 1234567890" in out
assert "Started copy of WAL files for server main" in out
assert "1234567891" not in out
# Patch on the fly the Lockfile object, testing the locking
# management of the method.
with mock.patch.multiple('barman.server',
ServerBackupSyncLock=mock.DEFAULT,
def test_from_json(self, tmpdir):
server = build_mocked_server(
main_conf={
'basebackups_directory': tmpdir.strpath
},
)
# Build a fake backup
backup_dir = tmpdir.mkdir('fake_backup_id')
info_file = backup_dir.join('backup.info')
info_file.write(BASE_BACKUP_INFO)
b_info = LocalBackupInfo(server, backup_id="fake_backup_id")
# Build another BackupInfo from the json dump
new_binfo = LocalBackupInfo.from_json(server, b_info.to_json())
assert b_info.to_dict() == new_binfo.to_dict()
capsys, tmpdir):
"""
Test the execution of a backup
:param rwbb_mock: mock for the remove_wal_before_backup method
:param gpb_mock: mock for the get_previous_backup method
:param backup_copy_mock: mock for the executor's backup_copy method
:param capsys: stdout capture module
:param tmpdir: pytest temp directory
"""
backup_manager = build_backup_manager(global_conf={
'barman_home': tmpdir.mkdir('home').strpath,
# Silence the warning for default backup strategy
'backup_options': 'exclusive_backup',
})
backup_info = LocalBackupInfo(backup_manager.server,
backup_id='fake_backup_id')
backup_info.begin_xlog = "0/2000028"
backup_info.begin_wal = "000000010000000000000002"
backup_info.begin_offset = 40
backup_info.status = BackupInfo.EMPTY
backup_info.copy_stats = dict(copy_time=100)
gpb_mock.return_value = None
rwbb_mock.return_value = ['000000010000000000000001']
# Test 1: exclusive backup
backup_manager.executor.strategy = Mock()
backup_manager.executor.backup(backup_info)
out, err = capsys.readouterr()
assert err == ''
end_time = datetime.now(tz.tzlocal())
# Generate a list of tablespace objects (don't use a list comprehension
# or in python 2.x the 'item' variable will leak to the main context)
if tablespaces is not None:
tablespaces = list(Tablespace._make(item) for item in tablespaces)
# Manage the server for the Backup info: if no server is provided
# by the caller use a Mock with a basic configuration
if server is None:
server = mock.Mock(name=server_name)
server.config = build_config_from_dicts().get_server('main')
server.passive_node = False
server.backup_manager.name = 'default'
backup_info = LocalBackupInfo(**locals())
return backup_info
ident_file="/pg/pg_ident.conf",
)
# Mock server.get_pg_tablespaces() call
tablespaces = [Tablespace._make(('test_tbs', 1234, '/tbs/test'))]
postgres_mock.get_tablespaces.return_value = tablespaces
# this is a postgres 9.5
postgres_mock.server_version = 90500
# Mock call to new api method
start_time = datetime.datetime.now()
postgres_mock.current_xlog_info = {
'location': "A257/44B4C0D8",
'timestamp': start_time,
}
# Build a test empty backup info
backup_info = LocalBackupInfo(server=backup_manager.server,
backup_id='fake_id2')
backup_manager.executor.strategy.start_backup(backup_info)
# Check that all the values are correctly saved inside the BackupInfo
assert backup_info.pgdata == '/test/fake_data_dir'
assert backup_info.config_file == "/etc/postgresql.conf"
assert backup_info.hba_file == "/pg/pg_hba.conf"
assert backup_info.ident_file == "/pg/pg_ident.conf"
assert backup_info.tablespaces == tablespaces
assert backup_info.status == 'STARTED'
assert backup_info.timeline is None
assert backup_info.begin_xlog == 'A257/44B4C0D8'
assert backup_info.begin_wal is None
assert backup_info.begin_offset is None
assert backup_info.begin_time == start_time
def get_previous_backup(self, backup_id,
status_filter=DEFAULT_STATUS_FILTER):
"""
Get the previous backup (if any) in the catalog
:param status_filter: default DEFAULT_STATUS_FILTER. The status of
the backup returned
"""
if not isinstance(status_filter, tuple):
status_filter = tuple(status_filter)
backup = LocalBackupInfo(self.server, backup_id=backup_id)
available_backups = self.get_available_backups(
status_filter + (backup.status,))
ids = sorted(available_backups.keys())
try:
current = ids.index(backup_id)
while current > 0:
res = available_backups[ids[current - 1]]
if res.status in status_filter:
return res
current -= 1
return None
except ValueError:
raise UnknownBackupIdException('Could not find backup_id %s' %
backup_id)
def save(self, filename=None, file_object=None):
if not file_object:
# Make sure the containing directory exists
filename = filename or self.filename
dir_name = os.path.dirname(filename)
if not os.path.exists(dir_name):
os.makedirs(dir_name)
super(LocalBackupInfo, self).save(filename=filename,
file_object=file_object)