Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
val *= base
assert barman.utils.pretty_size(val, base) == '10.0 MiB'
val *= base
assert barman.utils.pretty_size(val, base) == '10.0 GiB'
val *= base
assert barman.utils.pretty_size(val, base) == '10.0 TiB'
val *= base
assert barman.utils.pretty_size(val, base) == '10.0 PiB'
val *= base
assert barman.utils.pretty_size(val, base) == '10.0 EiB'
val *= base
assert barman.utils.pretty_size(val, base) == '10.0 ZiB'
val *= base
assert barman.utils.pretty_size(val, base) == '10.0 YiB'
val *= base
assert barman.utils.pretty_size(val, base) == '10240.0 YiB'
def test_1024(self):
val = 10
base = 1024
assert barman.utils.pretty_size(val, base) == '10 B'
val *= base
assert barman.utils.pretty_size(val, base) == '10.0 KiB'
val *= base
assert barman.utils.pretty_size(val, base) == '10.0 MiB'
val *= base
assert barman.utils.pretty_size(val, base) == '10.0 GiB'
val *= base
assert barman.utils.pretty_size(val, base) == '10.0 TiB'
val *= base
assert barman.utils.pretty_size(val, base) == '10.0 PiB'
val *= base
assert barman.utils.pretty_size(val, base) == '10.0 EiB'
val *= base
assert barman.utils.pretty_size(val, base) == '10.0 ZiB'
val *= base
assert barman.utils.pretty_size(val, base) == '10.0 YiB'
if data['status'] in BackupInfo.STATUS_COPY_DONE:
output.update(dict(
postgresql_version=data['version'],
pgdata_directory=data['pgdata'],
tablespaces=[]
))
if data['tablespaces']:
for item in data['tablespaces']:
output['tablespaces'].append(dict(
name=item.name,
location=item.location,
oid=item.oid
))
output['base_backup_information'] = dict(
disk_usage=pretty_size(data['size']),
disk_usage_bytes=data['size'],
disk_usage_with_wals=pretty_size(
data['size'] + data['wal_size']),
disk_usage_with_wals_bytes=data['size'] + data['wal_size']
)
if data['deduplicated_size'] is not None and data['size'] > 0:
deduplication_ratio = (1 - (
float(data['deduplicated_size']) / data['size']))
output['base_backup_information'].update(dict(
incremental_size=pretty_size(data['deduplicated_size']),
incremental_size_bytes=data['deduplicated_size'],
incremental_size_ratio='-{percent:.2%}'.format(
percent=deduplication_ratio)
))
output['base_backup_information'].update(dict(
timeline=data['timeline'],
:param wal_size: size of WAL files belonging to this backup
(without the required WAL files)
:param retention_status: retention policy status
"""
# If minimal is set only output the backup id
if self.minimal:
self.info(backup_info.backup_id)
return
out_list = [
"%s %s - " % (backup_info.server_name, backup_info.backup_id)]
if backup_info.status in BackupInfo.STATUS_COPY_DONE:
end_time = backup_info.end_time.ctime()
out_list.append('%s - Size: %s - WAL Size: %s' %
(end_time,
pretty_size(backup_size),
pretty_size(wal_size)))
if backup_info.tablespaces:
tablespaces = [("%s:%s" % (tablespace.name,
tablespace.location))
for tablespace in backup_info.tablespaces]
out_list.append(' (tablespaces: %s)' %
', '.join(tablespaces))
if backup_info.status == BackupInfo.WAITING_FOR_WALS:
out_list.append(' - %s' % BackupInfo.WAITING_FOR_WALS)
if retention_status and retention_status != BackupInfo.NONE:
out_list.append(' - %s' % retention_status)
else:
out_list.append(backup_info.status)
self.info(''.join(out_list))
postgresql_version=data['version'],
pgdata_directory=data['pgdata'],
tablespaces=[]
))
if data['tablespaces']:
for item in data['tablespaces']:
output['tablespaces'].append(dict(
name=item.name,
location=item.location,
oid=item.oid
))
output['base_backup_information'] = dict(
disk_usage=pretty_size(data['size']),
disk_usage_bytes=data['size'],
disk_usage_with_wals=pretty_size(
data['size'] + data['wal_size']),
disk_usage_with_wals_bytes=data['size'] + data['wal_size']
)
if data['deduplicated_size'] is not None and data['size'] > 0:
deduplication_ratio = (1 - (
float(data['deduplicated_size']) / data['size']))
output['base_backup_information'].update(dict(
incremental_size=pretty_size(data['deduplicated_size']),
incremental_size_bytes=data['deduplicated_size'],
incremental_size_ratio='-{percent:.2%}'.format(
percent=deduplication_ratio)
))
output['base_backup_information'].update(dict(
timeline=data['timeline'],
begin_wal=data['begin_wal'],
end_wal=data['end_wal']
self.info(" Server Name : %s", data['server_name'])
if data['systemid']:
self.info(" System Id : %s", data['systemid'])
self.info(" Status : %s", data['status'])
if data['status'] in BackupInfo.STATUS_COPY_DONE:
self.info(" PostgreSQL Version : %s", data['version'])
self.info(" PGDATA directory : %s", data['pgdata'])
if data['tablespaces']:
self.info(" Tablespaces:")
for item in data['tablespaces']:
self.info(" %s: %s (oid: %s)",
item.name, item.location, item.oid)
self.info("")
self.info(" Base backup information:")
self.info(" Disk usage : %s (%s with WALs)",
pretty_size(data['size']),
pretty_size(data['size'] + data[
'wal_size']))
if data['deduplicated_size'] is not None and data['size'] > 0:
deduplication_ratio = (
1 - (float(data['deduplicated_size']) / data['size']))
self.info(" Incremental size : %s (-%s)",
pretty_size(data['deduplicated_size']),
'{percent:.2%}'.format(percent=deduplication_ratio)
)
self.info(" Timeline : %s", data['timeline'])
self.info(" Begin WAL : %s",
data['begin_wal'])
self.info(" End WAL : %s", data['end_wal'])
self.info(" WAL number : %s", data['wal_num'])
# Output WAL compression ratio for basebackup WAL files
if data['wal_compression_ratio'] > 0:
if getattr(standby, 'sent_lsn', None):
self.info(" Sent LSN : %s (diff: %s)",
standby.sent_lsn,
pretty_size(sent_diff))
if getattr(standby, 'write_lsn', None):
self.info(" Write LSN : %s (diff: %s)",
standby.write_lsn,
pretty_size(write_diff))
if getattr(standby, 'flush_lsn', None):
self.info(" Flush LSN : %s (diff: %s)",
standby.flush_lsn,
pretty_size(flush_diff))
if getattr(standby, 'replay_lsn', None):
self.info(" Replay LSN : %s (diff: %s)",
standby.replay_lsn,
pretty_size(replay_diff))
n += 1
if getattr(standby, 'slot_name', None):
json_output['replication_slot'] = standby.slot_name
json_output.update(dict(
wal_sender_pid=standby.pid,
started_at=standby.backend_start.isoformat(sep=' '),
))
if getattr(standby, 'backend_xmin', None):
json_output['standbys_xmin'] = standby.backend_xmin or None
for lsn in lsn_diff.keys():
standby_key = lsn + '_lsn'
if getattr(standby, standby_key, None):
json_output.update({
lsn + '_lsn': getattr(standby, standby_key),
lsn + '_lsn_diff': pretty_size(lsn_diff[lsn]),
lsn + '_lsn_diff_bytes': lsn_diff[lsn]
})
self.info(" Communication : Unix domain socket")
self.info(" User name : %s", standby.usename)
self.info(" Current state : %s (%s)",
standby.state,
standby.sync_state)
if getattr(standby, 'slot_name', None):
self.info(" Replication slot: %s", standby.slot_name)
self.info(" WAL sender PID : %s", standby.pid)
self.info(" Started at : %s", standby.backend_start)
if getattr(standby, 'backend_xmin', None):
self.info(" Standby's xmin : %s",
standby.backend_xmin or '-')
if getattr(standby, 'sent_lsn', None):
self.info(" Sent LSN : %s (diff: %s)",
standby.sent_lsn,
pretty_size(sent_diff))
if getattr(standby, 'write_lsn', None):
self.info(" Write LSN : %s (diff: %s)",
standby.write_lsn,
pretty_size(write_diff))
if getattr(standby, 'flush_lsn', None):
self.info(" Flush LSN : %s (diff: %s)",
standby.flush_lsn,
pretty_size(flush_diff))
if getattr(standby, 'replay_lsn', None):
self.info(" Replay LSN : %s (diff: %s)",
standby.replay_lsn,
pretty_size(replay_diff))
n += 1
location=item.location,
oid=item.oid
))
output['base_backup_information'] = dict(
disk_usage=pretty_size(data['size']),
disk_usage_bytes=data['size'],
disk_usage_with_wals=pretty_size(
data['size'] + data['wal_size']),
disk_usage_with_wals_bytes=data['size'] + data['wal_size']
)
if data['deduplicated_size'] is not None and data['size'] > 0:
deduplication_ratio = (1 - (
float(data['deduplicated_size']) / data['size']))
output['base_backup_information'].update(dict(
incremental_size=pretty_size(data['deduplicated_size']),
incremental_size_bytes=data['deduplicated_size'],
incremental_size_ratio='-{percent:.2%}'.format(
percent=deduplication_ratio)
))
output['base_backup_information'].update(dict(
timeline=data['timeline'],
begin_wal=data['begin_wal'],
end_wal=data['end_wal']
))
if data['wal_compression_ratio'] > 0:
output['base_backup_information'].update(dict(
wal_compression_ratio='{percent:.2%}'.format(
percent=data['wal_compression_ratio'])
))
output['base_backup_information'].update(dict(
begin_time_timestamp=data['begin_time'].strftime('%s'),