Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _process_plan(self, plan, plan_backups):
total_dont_expire = 0
total_expired = 0
logger.info("==== Processing plan '%s' .... " % plan.id)
# Ensure we have the latest revision of the backup plan
plan = persistence.get_backup_plan(plan.id) or plan
try:
expirable_backups, non_expirable_backups = self.find_plan_expirable_backups(plan, plan_backups)
if non_expirable_backups:
mark_plan_backups_not_expirable(plan, non_expirable_backups)
total_dont_expire += len(non_expirable_backups)
total_expired += self.expire_plan_dues(plan, expirable_backups)
except Exception, e:
logger.exception("BackupExpirationManager Error while"
" processing plan '%s'" % plan.id)
subject = "BackupExpirationManager Error"
message = ("BackupExpirationManager Error while processing"
" plan '%s'\n\nStack Trace:\n%s" %
(plan.id, traceback.format_exc()))
get_mbs().notifications.send_error_notification(subject, message)
def is_plan_backups_expirable(self, plan):
# We only allow expiring backups that has a whose plans still exist
# and has a retention policy
return persistence.get_backup_plan(plan.id) is not None
def validate_recurring_backup_expiration(self, backup):
logger.info("Validating if recurring backup '%s' should be "
"expired now" % backup.id)
# Ensure we have the latest revision of the backup plan when possible
plan = persistence.get_backup_plan(backup.plan.id) or backup.plan
rp = plan.retention_policy
if not rp:
raise BackupExpirationError(
"Bad attempt to expire backup '%s'. "
"Backup plan does not have a retention policy" % backup.id)
occurrences_to_retain = \
rp.get_plan_occurrences_to_retain_as_of(plan, date_now())
if backup.plan_occurrence in occurrences_to_retain:
raise BackupExpirationError(
"Bad attempt to expire backup '%s'. Backup must not be"
" expired now." % backup.id)
else:
logger.info("Backup '%s' good be expired now" %