Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_hash_dir(self):
assert xlog.hash_dir(
'000000000000000200000001') == '0000000000000002'
assert xlog.hash_dir(
'000000010000000000000002') == '0000000100000000'
assert xlog.hash_dir(
'test/000000020000000100000000') == '0000000200000001'
assert xlog.hash_dir(
'00000001.history') == ''
assert xlog.hash_dir(
'00000002.history') == ''
assert xlog.hash_dir(
'00000001000000000000000A.00000020.backup') == '0000000100000000'
assert xlog.hash_dir(
'00000002000000050000000A.00000020.backup') == '0000000200000005'
with pytest.raises(barman.exceptions.BadXlogSegmentName):
xlog.hash_dir('00000000000000000000000')
with pytest.raises(barman.exceptions.BadXlogSegmentName):
xlog.hash_dir('0000000000000000000000000')
with pytest.raises(barman.exceptions.BadXlogSegmentName):
xlog.hash_dir('000000000000X00000000000')
wal_file = incoming_dir.join(wal_name)
wal_file.ensure()
archive_dir.ensure(dir=True)
xlog_db.ensure()
backup_manager.server.xlogdb.return_value.__enter__.return_value = \
xlog_db.open(mode='a')
backup_manager.server.archivers = [FileWalArchiver(backup_manager)]
backup_manager.archive_wal()
# Check that the WAL file is not present inside the wal catalog
with xlog_db.open() as f:
line = str(f.readline())
assert wal_name in line
wal_path = os.path.join(archive_dir.strpath,
barman.xlog.hash_dir(wal_name),
wal_name)
# Check that the wal file have been archived
assert os.path.exists(wal_path)
# Check the output for the archival of the wal file
out, err = capsys.readouterr()
assert ("\t%s\n" % wal_name) in out
def test_hash_dir(self):
assert xlog.hash_dir(
'000000000000000200000001') == '0000000000000002'
assert xlog.hash_dir(
'000000010000000000000002') == '0000000100000000'
assert xlog.hash_dir(
'test/000000020000000100000000') == '0000000200000001'
assert xlog.hash_dir(
'00000001.history') == ''
assert xlog.hash_dir(
'00000002.history') == ''
assert xlog.hash_dir(
'00000001000000000000000A.00000020.backup') == '0000000100000000'
assert xlog.hash_dir(
'00000002000000050000000A.00000020.backup') == '0000000200000005'
with pytest.raises(barman.exceptions.BadXlogSegmentName):
xlog.hash_dir('00000000000000000000000')
with pytest.raises(barman.exceptions.BadXlogSegmentName):
xlog.hash_dir('0000000000000000000000000')
with pytest.raises(barman.exceptions.BadXlogSegmentName):
xlog.hash_dir('000000000000X00000000000')
source = '/wal_dir/000000080000ABFF000000C1'
# Simulate the file object returned by the retrieve_file_obj method
rfo_mock.return_value.name = source
uploader.upload_wal(source)
session_mock = boto_mock.Session.return_value
s3_client_mock = session_mock.resource.return_value.meta.client
# Check the call for the creation of the destination key
s3_client_mock.upload_fileobj.assert_called_once_with(
Fileobj=rfo_mock.return_value,
Bucket=cloud_interface.bucket_name,
Key=os.path.join(
cloud_interface.path,
uploader.server_name,
'wals',
hash_dir(source),
os.path.basename(source)),
ExtraArgs={}
)
:param required_xlog_files: list of all required WAL files
:param wal_dest: the destination directory for xlog recover
:param remote_command: default None. The remote command to recover
the xlog, in case of remote backup.
"""
# List of required WAL files partitioned by containing directory
xlogs = collections.defaultdict(list)
# add '/' suffix to ensure it is a directory
wal_dest = '%s/' % wal_dest
# Map of every compressor used with any WAL file in the archive,
# to be used during this recovery
compressors = {}
compression_manager = self.backup_manager.compression_manager
# Fill xlogs and compressors maps from required_xlog_files
for wal_info in required_xlog_files:
hashdir = xlog.hash_dir(wal_info.name)
xlogs[hashdir].append(wal_info)
# If a compressor is required, make sure it exists in the cache
if wal_info.compression is not None and \
wal_info.compression not in compressors:
compressors[wal_info.compression] = \
compression_manager.get_compressor(
compression=wal_info.compression)
rsync = RsyncPgData(
path=self.server.path,
ssh=remote_command,
bwlimit=self.config.bandwidth_limit,
network_compression=self.config.network_compression)
# If compression is used and this is a remote recovery, we need a
# temporary directory where to spool uncompressed files,
# otherwise we either decompress every WAL file in the local