How to use the barman.utils.human_readable_timedelta function in barman

To help you get started, we’ve selected a few barman examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github 2ndquadrant-it / barman / tests / test_utils.py View on Github external
def test_one_day_three_hour_two_mins(self):
        """
        Test output for a 1 day, 3 hour, 2 minutes timedelta.
        """
        td = timedelta(days=1, seconds=10920)
        assert barman.utils.human_readable_timedelta(td) == '1 day, ' \
                                                            '3 hours, ' \
github 2ndquadrant-it / barman / tests / test_utils.py View on Github external
def test_seven_days(self):
        """
        Test output for a 1 week timedelta.
        """
        td = timedelta(weeks=1)
        assert barman.utils.human_readable_timedelta(td) == '7 days'
github 2ndquadrant-it / barman / tests / test_backup.py View on Github external
instance.end_time = now - timedelta(days=8)
        # build the expected message
        msg = barman.utils.human_readable_timedelta(now - instance.end_time)
        r = backup_manager.validate_last_backup_maximum_age(
            backup_manager.config.last_backup_maximum_age)
        assert (r[0], r[1]) == (False, msg)

        # case 3: backup inside the one day limit
        # mocking the backup id to a custom value
        backup_id_mock.return_value = "Mock_backup"
        # simulate an existing backup using a mock obj
        instance = infofile_mock.return_value
        # set the backup end date inside the limit
        instance.end_time = now - timedelta(days=2)
        # build the expected msg
        msg = barman.utils.human_readable_timedelta(now - instance.end_time)
        r = backup_manager.validate_last_backup_maximum_age(
            backup_manager.config.last_backup_maximum_age)
        assert (r[0], r[1]) == (True, msg)
github 2ndquadrant-it / barman / tests / test_utils.py View on Github external
def test_one_hour_two_mins(self):
        """
        Test output for a 1 hour, 2 minutes timedelta.
        """
        td = timedelta(seconds=3720)
        assert barman.utils.human_readable_timedelta(td) == '1 hour, 2 minutes'
github 2ndquadrant-it / barman / tests / test_utils.py View on Github external
def test_one_minute(self):
        """
        Test output for a 1 minute timedelta.
        """
        td = timedelta(seconds=60)
        assert barman.utils.human_readable_timedelta(td) == '1 minute'
github 2ndquadrant-it / barman / barman / output.py View on Github external
output['base_backup_information'].update(dict(
                begin_time_timestamp=data['begin_time'].strftime('%s'),
                begin_time=data['begin_time'].isoformat(sep=' '),
                end_time_timestamp=data['end_time'].strftime('%s'),
                end_time=data['end_time'].isoformat(sep=' ')
            ))
            copy_stats = data.get('copy_stats')
            if copy_stats:
                copy_time = copy_stats.get('copy_time')
                analysis_time = copy_stats.get('analysis_time', 0)
                if copy_time:
                    output['base_backup_information'].update(dict(
                        copy_time=human_readable_timedelta(
                            datetime.timedelta(seconds=copy_time)),
                        copy_time_seconds=copy_time,
                        analysis_time=human_readable_timedelta(
                            datetime.timedelta(seconds=analysis_time)),
                        analysis_time_seconds=analysis_time
                    ))
                    size = data['deduplicated_size'] or data['size']
                    output['base_backup_information'].update(dict(
                        throughput="%s/s" % pretty_size(size / copy_time),
                        throughput_bytes=size / copy_time,
                        number_of_workers=copy_stats.get(
                            'number_of_workers', 1)
                    ))

            output['base_backup_information'].update(dict(
                begin_offset=data['begin_offset'],
                end_offset=data['end_offset'],
                begin_lsn=data['begin_xlog'],
                end_lsn=data['end_xlog']
github 2ndquadrant-it / barman / barman / output.py View on Github external
self.info("    End WAL              : %s", data['end_wal'])
            self.info("    WAL number           : %s", data['wal_num'])
            # Output WAL compression ratio for basebackup WAL files
            if data['wal_compression_ratio'] > 0:
                self.info("    WAL compression ratio: %s",
                          '{percent:.2%}'.format(
                              percent=data['wal_compression_ratio']))
            self.info("    Begin time           : %s",
                      data['begin_time'])
            self.info("    End time             : %s", data['end_time'])
            # If copy statistics are available print a summary
            copy_stats = data.get('copy_stats')
            if copy_stats:
                copy_time = copy_stats.get('copy_time')
                if copy_time:
                    value = human_readable_timedelta(
                        datetime.timedelta(seconds=copy_time))
                    # Show analysis time if it is more than a second
                    analysis_time = copy_stats.get('analysis_time')
                    if analysis_time is not None and analysis_time >= 1:
                        value += " + %s startup" % (human_readable_timedelta(
                            datetime.timedelta(seconds=analysis_time)))
                    self.info("    Copy time            : %s", value)
                    size = data['deduplicated_size'] or data['size']
                    value = "%s/s" % pretty_size(size / copy_time)
                    number_of_workers = copy_stats.get('number_of_workers', 1)
                    if number_of_workers > 1:
                        value += " (%s jobs)" % number_of_workers
                    self.info("    Estimated throughput : %s", value)
            self.info("    Begin Offset         : %s",
                      data['begin_offset'])
            self.info("    End Offset           : %s",
github 2ndquadrant-it / barman / barman / server.py View on Github external
:param CheckStrategy check_strategy: the strategy for the management
             of the results of the various checks
        """
        check_strategy.init_check('backup maximum age')
        # first check: check backup maximum age
        if self.config.last_backup_maximum_age is not None:
            # get maximum age information
            backup_age = self.backup_manager.validate_last_backup_maximum_age(
                self.config.last_backup_maximum_age)

            # format the output
            check_strategy.result(
                self.config.name, backup_age[0],
                hint="interval provided: %s, latest backup age: %s" % (
                    human_readable_timedelta(
                        self.config.last_backup_maximum_age), backup_age[1]))
        else:
            # last_backup_maximum_age provided by the user
            check_strategy.result(
                self.config.name,
                True,
                hint="no last_backup_maximum_age provided")
github 2ndquadrant-it / barman / barman / copy_controller.py View on Github external
assert job.checksum is None, \
                'A file item must have a None `checksum` attribute'
            rsync(item.src, item.dst, allowed_retval=(0, 23, 24))
            if rsync.ret == 23:
                if item.optional:
                    _logger.warning(
                        "Ignoring error reading %s", item)
                else:
                    raise CommandFailedException(dict(
                        ret=rsync.ret, out=rsync.out, err=rsync.err))
        # Store the stop time
        job.copy_end_time = datetime.datetime.now()
        # Write in the log that the job is finished
        with _logger_lock:
            _logger.info(job.description, bucket,
                         'finished (duration: %s)' % human_readable_timedelta(
                             job.copy_end_time - job.copy_start_time))
        # Return the job to the caller, for statistics purpose
        return job