Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_one_day_three_hour_two_mins(self):
"""
Test output for a 1 day, 3 hour, 2 minutes timedelta.
"""
td = timedelta(days=1, seconds=10920)
assert barman.utils.human_readable_timedelta(td) == '1 day, ' \
'3 hours, ' \
def test_seven_days(self):
"""
Test output for a 1 week timedelta.
"""
td = timedelta(weeks=1)
assert barman.utils.human_readable_timedelta(td) == '7 days'
instance.end_time = now - timedelta(days=8)
# build the expected message
msg = barman.utils.human_readable_timedelta(now - instance.end_time)
r = backup_manager.validate_last_backup_maximum_age(
backup_manager.config.last_backup_maximum_age)
assert (r[0], r[1]) == (False, msg)
# case 3: backup inside the one day limit
# mocking the backup id to a custom value
backup_id_mock.return_value = "Mock_backup"
# simulate an existing backup using a mock obj
instance = infofile_mock.return_value
# set the backup end date inside the limit
instance.end_time = now - timedelta(days=2)
# build the expected msg
msg = barman.utils.human_readable_timedelta(now - instance.end_time)
r = backup_manager.validate_last_backup_maximum_age(
backup_manager.config.last_backup_maximum_age)
assert (r[0], r[1]) == (True, msg)
def test_one_hour_two_mins(self):
"""
Test output for a 1 hour, 2 minutes timedelta.
"""
td = timedelta(seconds=3720)
assert barman.utils.human_readable_timedelta(td) == '1 hour, 2 minutes'
def test_one_minute(self):
"""
Test output for a 1 minute timedelta.
"""
td = timedelta(seconds=60)
assert barman.utils.human_readable_timedelta(td) == '1 minute'
output['base_backup_information'].update(dict(
begin_time_timestamp=data['begin_time'].strftime('%s'),
begin_time=data['begin_time'].isoformat(sep=' '),
end_time_timestamp=data['end_time'].strftime('%s'),
end_time=data['end_time'].isoformat(sep=' ')
))
copy_stats = data.get('copy_stats')
if copy_stats:
copy_time = copy_stats.get('copy_time')
analysis_time = copy_stats.get('analysis_time', 0)
if copy_time:
output['base_backup_information'].update(dict(
copy_time=human_readable_timedelta(
datetime.timedelta(seconds=copy_time)),
copy_time_seconds=copy_time,
analysis_time=human_readable_timedelta(
datetime.timedelta(seconds=analysis_time)),
analysis_time_seconds=analysis_time
))
size = data['deduplicated_size'] or data['size']
output['base_backup_information'].update(dict(
throughput="%s/s" % pretty_size(size / copy_time),
throughput_bytes=size / copy_time,
number_of_workers=copy_stats.get(
'number_of_workers', 1)
))
output['base_backup_information'].update(dict(
begin_offset=data['begin_offset'],
end_offset=data['end_offset'],
begin_lsn=data['begin_xlog'],
end_lsn=data['end_xlog']
self.info(" End WAL : %s", data['end_wal'])
self.info(" WAL number : %s", data['wal_num'])
# Output WAL compression ratio for basebackup WAL files
if data['wal_compression_ratio'] > 0:
self.info(" WAL compression ratio: %s",
'{percent:.2%}'.format(
percent=data['wal_compression_ratio']))
self.info(" Begin time : %s",
data['begin_time'])
self.info(" End time : %s", data['end_time'])
# If copy statistics are available print a summary
copy_stats = data.get('copy_stats')
if copy_stats:
copy_time = copy_stats.get('copy_time')
if copy_time:
value = human_readable_timedelta(
datetime.timedelta(seconds=copy_time))
# Show analysis time if it is more than a second
analysis_time = copy_stats.get('analysis_time')
if analysis_time is not None and analysis_time >= 1:
value += " + %s startup" % (human_readable_timedelta(
datetime.timedelta(seconds=analysis_time)))
self.info(" Copy time : %s", value)
size = data['deduplicated_size'] or data['size']
value = "%s/s" % pretty_size(size / copy_time)
number_of_workers = copy_stats.get('number_of_workers', 1)
if number_of_workers > 1:
value += " (%s jobs)" % number_of_workers
self.info(" Estimated throughput : %s", value)
self.info(" Begin Offset : %s",
data['begin_offset'])
self.info(" End Offset : %s",
:param CheckStrategy check_strategy: the strategy for the management
of the results of the various checks
"""
check_strategy.init_check('backup maximum age')
# first check: check backup maximum age
if self.config.last_backup_maximum_age is not None:
# get maximum age information
backup_age = self.backup_manager.validate_last_backup_maximum_age(
self.config.last_backup_maximum_age)
# format the output
check_strategy.result(
self.config.name, backup_age[0],
hint="interval provided: %s, latest backup age: %s" % (
human_readable_timedelta(
self.config.last_backup_maximum_age), backup_age[1]))
else:
# last_backup_maximum_age provided by the user
check_strategy.result(
self.config.name,
True,
hint="no last_backup_maximum_age provided")
assert job.checksum is None, \
'A file item must have a None `checksum` attribute'
rsync(item.src, item.dst, allowed_retval=(0, 23, 24))
if rsync.ret == 23:
if item.optional:
_logger.warning(
"Ignoring error reading %s", item)
else:
raise CommandFailedException(dict(
ret=rsync.ret, out=rsync.out, err=rsync.err))
# Store the stop time
job.copy_end_time = datetime.datetime.now()
# Write in the log that the job is finished
with _logger_lock:
_logger.info(job.description, bucket,
'finished (duration: %s)' % human_readable_timedelta(
job.copy_end_time - job.copy_start_time))
# Return the job to the caller, for statistics purpose
return job