Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_decode_segment_name(self):
assert xlog.decode_segment_name(
'000000000000000000000000') == [0, 0, 0]
assert xlog.decode_segment_name(
'000000010000000100000001') == [1, 1, 1]
assert xlog.decode_segment_name(
'0000000A0000000A0000000A') == [10, 10, 10]
assert xlog.decode_segment_name(
'000000110000001100000011') == [17, 17, 17]
assert xlog.decode_segment_name(
'000000000000000200000001') == [0, 2, 1]
assert xlog.decode_segment_name(
'000000010000000000000002') == [1, 0, 2]
assert xlog.decode_segment_name(
'000000020000000100000000') == [2, 1, 0]
assert xlog.decode_segment_name(
'00000001000000000000000A.00000020.backup') == [1, 0, 10]
assert xlog.decode_segment_name(
'00000001.history') == [1, None, None]
with pytest.raises(barman.exceptions.BadXlogSegmentName):
xlog.decode_segment_name('00000000000000000000000')
with pytest.raises(barman.exceptions.BadXlogSegmentName):
def test_decode_segment_name(self):
assert xlog.decode_segment_name(
'000000000000000000000000') == [0, 0, 0]
assert xlog.decode_segment_name(
'000000010000000100000001') == [1, 1, 1]
assert xlog.decode_segment_name(
'0000000A0000000A0000000A') == [10, 10, 10]
assert xlog.decode_segment_name(
'000000110000001100000011') == [17, 17, 17]
assert xlog.decode_segment_name(
'000000000000000200000001') == [0, 2, 1]
assert xlog.decode_segment_name(
'000000010000000000000002') == [1, 0, 2]
assert xlog.decode_segment_name(
'000000020000000100000000') == [2, 1, 0]
assert xlog.decode_segment_name(
'00000001000000000000000A.00000020.backup') == [1, 0, 10]
assert xlog.decode_segment_name(
'00000001.history') == [1, None, None]
with pytest.raises(barman.exceptions.BadXlogSegmentName):
xlog.decode_segment_name('00000000000000000000000')
with pytest.raises(barman.exceptions.BadXlogSegmentName):
xlog.decode_segment_name('0000000000000000000000000')
with pytest.raises(barman.exceptions.BadXlogSegmentName):
xlog.decode_segment_name('000000000000X00000000000')
def test_decode_segment_name(self):
assert xlog.decode_segment_name(
'000000000000000000000000') == [0, 0, 0]
assert xlog.decode_segment_name(
'000000010000000100000001') == [1, 1, 1]
assert xlog.decode_segment_name(
'0000000A0000000A0000000A') == [10, 10, 10]
assert xlog.decode_segment_name(
'000000110000001100000011') == [17, 17, 17]
assert xlog.decode_segment_name(
'000000000000000200000001') == [0, 2, 1]
assert xlog.decode_segment_name(
'000000010000000000000002') == [1, 0, 2]
assert xlog.decode_segment_name(
'000000020000000100000000') == [2, 1, 0]
assert xlog.decode_segment_name(
'00000001000000000000000A.00000020.backup') == [1, 0, 10]
assert xlog.decode_segment_name(
'00000001.history') == [1, None, None]
with pytest.raises(barman.exceptions.BadXlogSegmentName):
xlog.decode_segment_name('00000000000000000000000')
with pytest.raises(barman.exceptions.BadXlogSegmentName):
xlog.decode_segment_name('0000000000000000000000000')
with pytest.raises(barman.exceptions.BadXlogSegmentName):
xlog.decode_segment_name('000000000000X00000000000')
def get_required_xlog_files(self, backup, target_tli=None,
target_time=None, target_xid=None):
"""
Get the xlog files required for a recovery
"""
begin = backup.begin_wal
end = backup.end_wal
# If timeline isn't specified, assume it is the same timeline
# of the backup
if not target_tli:
target_tli, _, _ = xlog.decode_segment_name(end)
with self.xlogdb() as fxlogdb:
for line in fxlogdb:
wal_info = WalFileInfo.from_xlogdb_line(line)
# Handle .history files: add all of them to the output,
# regardless of their age
if xlog.is_history_file(wal_info.name):
yield wal_info
continue
if wal_info.name < begin:
continue
tli, _, _ = xlog.decode_segment_name(wal_info.name)
if tli > target_tli:
continue
yield wal_info
if wal_info.name > end:
end = wal_info.name
def get_wal_until_next_backup(self, backup, include_history=False):
"""
Get the xlog files between backup and the next
:param BackupInfo backup: a backup object, the starting point
to retrieve WALs
:param bool include_history: option for the inclusion of
include_history files into the output
"""
begin = backup.begin_wal
next_end = None
if self.get_next_backup(backup.backup_id):
next_end = self.get_next_backup(backup.backup_id).end_wal
backup_tli, _, _ = xlog.decode_segment_name(begin)
with self.xlogdb() as fxlogdb:
for line in fxlogdb:
wal_info = WalFileInfo.from_xlogdb_line(line)
# Handle .history files: add all of them to the output,
# regardless of their age, if requested (the 'include_history'
# parameter is True)
if xlog.is_history_file(wal_info.name):
if include_history:
yield wal_info
continue
if wal_info.name < begin:
continue
tli, _, _ = xlog.decode_segment_name(wal_info.name)
if tli > backup_tli:
continue
wal_info = WalFileInfo.from_xlogdb_line(line)
if not xlog.is_any_xlog_file(wal_info.name):
output.error(
"invalid WAL segment name %r\n"
"HINT: Please run \"barman rebuild-xlogdb %s\" "
"to solve this issue",
wal_info.name, self.config.name)
continue
# Keeps the WAL segment if it is a history file
keep = xlog.is_history_file(wal_info.name)
# Keeps the WAL segment if its timeline is in
# `timelines_to_protect`
if timelines_to_protect:
tli, _, _ = xlog.decode_segment_name(wal_info.name)
keep |= tli in timelines_to_protect
# Keeps the WAL segment if it is a newer
# than the given backup (the first available)
if backup_info and backup_info.begin_wal is not None:
keep |= wal_info.name >= backup_info.begin_wal
# If the file has to be kept write it in the new xlogdb
# otherwise delete it and record it in the removed list
if keep:
fxlogdb_new.write(wal_info.to_xlogdb_line())
else:
self.delete_wal(wal_info)
removed.append(wal_info.name)
fxlogdb_new.flush()
os.fsync(fxlogdb_new.fileno())
except StopIteration:
# No more item in wal_peek_list
break
wal_peek_file = self.get_wal_full_path(wal_peek_name)
# If the next WAL file is found, output the name
# and continue to the next one
if os.path.exists(wal_peek_file):
count += 1
output.info(wal_peek_name, log=False)
continue
# If ``wal_peek_file`` doesn't exist, check if we need to
# look in the following segment
tli, log, seg = xlog.decode_segment_name(wal_peek_name)
# If `seg` is not a power of two, it is not possible that we
# are at the end of a WAL group, so we are done
if not is_power_of_two(seg):
break
# This is a possible WAL group boundary, let's try the
# following group
seg = 0
log += 1
# Install a new generator from the start of the next segment.
# If the file doesn't exists we will terminate because
# zero is not a power of two
wal_peek_name = xlog.encode_segment_name(tli, log, seg)
wal_peek_list = xlog.generate_segment_names(wal_peek_name)
next_end = self.get_next_backup(backup.backup_id).end_wal
backup_tli, _, _ = xlog.decode_segment_name(begin)
with self.xlogdb() as fxlogdb:
for line in fxlogdb:
wal_info = WalFileInfo.from_xlogdb_line(line)
# Handle .history files: add all of them to the output,
# regardless of their age, if requested (the 'include_history'
# parameter is True)
if xlog.is_history_file(wal_info.name):
if include_history:
yield wal_info
continue
if wal_info.name < begin:
continue
tli, _, _ = xlog.decode_segment_name(wal_info.name)
if tli > backup_tli:
continue
if not xlog.is_wal_file(wal_info.name):
continue
if next_end and wal_info.name > next_end:
break
yield wal_info
end = backup.end_wal
# If timeline isn't specified, assume it is the same timeline
# of the backup
if not target_tli:
target_tli, _, _ = xlog.decode_segment_name(end)
with self.xlogdb() as fxlogdb:
for line in fxlogdb:
wal_info = WalFileInfo.from_xlogdb_line(line)
# Handle .history files: add all of them to the output,
# regardless of their age
if xlog.is_history_file(wal_info.name):
yield wal_info
continue
if wal_info.name < begin:
continue
tli, _, _ = xlog.decode_segment_name(wal_info.name)
if tli > target_tli:
continue
yield wal_info
if wal_info.name > end:
end = wal_info.name
if target_time and target_time < wal_info.time:
break
# return all the remaining history files
for line in fxlogdb:
wal_info = WalFileInfo.from_xlogdb_line(line)
if xlog.is_history_file(wal_info.name):
yield wal_info