Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# Commit the merge.
self.commit(message="Merged %s" % feature_branch.expression)
# We skip merging up through release branches when the target branch is
# the default branch (in other words, there's nothing to merge up).
if target_branch != self.default_revision:
# Find the release branches in the repository.
release_branches = [release.revision.branch for release in self.ordered_releases]
logger.debug("Found %s: %s",
pluralize(len(release_branches), "release branch", "release branches"),
concatenate(release_branches))
# Find the release branches after the target branch.
later_branches = release_branches[release_branches.index(target_branch) + 1:]
logger.info("Found %s after target branch (%s): %s",
pluralize(len(later_branches), "release branch", "release branches"),
target_branch,
concatenate(later_branches))
# Determine the branches that need to be merged.
branches_to_upmerge = later_branches + [self.default_revision]
logger.info("Merging up from '%s' to %s: %s",
target_branch,
pluralize(len(branches_to_upmerge), "branch", "branches"),
concatenate(branches_to_upmerge))
# Merge the feature branch up through the selected branches.
merge_queue = [target_branch] + branches_to_upmerge
while len(merge_queue) >= 2:
from_branch = merge_queue[0]
to_branch = merge_queue[1]
logger.info("Merging '%s' into '%s' ..", from_branch, to_branch)
self.checkout(revision=to_branch)
self.merge(revision=from_branch)
self.commit(message="Merged %s" % from_branch)
merge_queue.pop(0)
def release_scheme(self, value):
"""Validate the release scheme."""
if value not in KNOWN_RELEASE_SCHEMES:
msg = "Release scheme %r is not supported! (valid options are %s)"
raise ValueError(msg % (value, concatenate(map(repr, KNOWN_RELEASE_SCHEMES))))
set_property(self, 'release_scheme', value)
count = int(count)
else:
# Floating point rounding for the smallest unit.
count = round_number(count)
# Only include relevant units in the result.
if count not in (0, '0'):
result.append(pluralize(count, unit['singular'], unit['plural']))
if len(result) == 1:
# A single count/unit combination.
return result[0]
else:
if not detailed:
# Remove `insignificant' data from the formatted timespan.
result = result[:max_units]
# Format the timespan in a readable way.
return concatenate(result)
def format_properties(self, names):
"""Format a list of property names as reStructuredText."""
return concatenate(format(":attr:`%s`", n) for n in sorted(names))
def simple_search(self, *keywords):
"""
Perform a simple search for case insensitive substring matches.
:param keywords: The string(s) to search for.
:returns: The matched password names (a generator of strings).
Only passwords whose names matches *all* of the given keywords are
returned.
"""
matches = []
keywords = [kw.lower() for kw in keywords]
logger.verbose(
"Performing simple search on %s (%s) ..",
pluralize(len(keywords), "keyword"),
concatenate(map(repr, keywords)),
)
for entry in self.filtered_entries:
normalized = entry.name.lower()
if all(kw in normalized for kw in keywords):
matches.append(entry)
logger.log(
logging.INFO if matches else logging.VERBOSE,
"Matched %s using simple search.",
pluralize(len(matches), "password"),
)
return matches
:returns: The validated value (one of the strings '1', '2', '3', 'idle',
'best-effort', or 'realtime').
:raises: :exc:`~exceptions.ValueError` when the given value isn't one of
the strings mentioned above.
The strings 'idle', 'best-effort' and 'realtime' are preferred for
readability but not supported in minimalistic environments like busybox
which only support the values '1', '2' and '3' (refer to `#16`_). It's up
to the caller to choose the correct value, no translation is done.
.. _#16: https://github.com/xolox/python-executor/pull/16
"""
expected = ('idle', 'best-effort', 'realtime', '1', '2', '3')
if value not in expected:
msg = "Invalid I/O scheduling class! (got %r while valid options are %s)"
raise ValueError(msg % (value, concatenate(expected)))
return value
Three VCS types are currently supported: ``hg`` (``mercurial`` is also
accepted), ``git`` and ``bzr`` (``bazaar`` is also accepted).
"""
parser = configparser.RawConfigParser()
for config_file in [SYSTEM_CONFIG_FILE, USER_CONFIG_FILE]:
config_file = parse_path(config_file)
if os.path.isfile(config_file):
logger.debug("Loading configuration file (%s) ..", format_path(config_file))
parser.read(config_file)
matching_repos = [r for r in parser.sections() if normalize_name(name) == normalize_name(r)]
if not matching_repos:
msg = "No repositories found matching the name '%s'!"
raise NoSuchRepositoryError(msg % name)
elif len(matching_repos) != 1:
msg = "Multiple repositories found matching the name '%s'! (matches: %s)"
raise AmbiguousRepositoryNameError(msg % (name, concatenate(map(repr, matching_repos))))
else:
kw = {}
# Get the repository specific options.
options = dict(parser.items(matching_repos[0]))
vcs_type = options.get('type', '').lower()
# Process the `local' directory pathname.
local_path = options.get('local')
if local_path:
# Expand a leading tilde and/or environment variables.
kw['local'] = parse_path(local_path)
# Process the `bare' option.
bare = options.get('bare', None)
if bare is not None:
# Default to bare=None but enable configuration
# file(s) to enforce bare=True or bare=False.
kw['bare'] = coerce_boolean(bare)
# Group the backups by the rotation frequencies.
backups_by_frequency = self.group_backups(sorted_backups)
# Apply the user defined rotation scheme.
self.apply_rotation_scheme(backups_by_frequency, most_recent_backup.timestamp)
# Find which backups to preserve and why.
backups_to_preserve = self.find_preservation_criteria(backups_by_frequency)
# Apply the calculated rotation scheme.
for backup in sorted_backups:
friendly_name = backup.pathname
if not location.is_remote:
# Use human friendly pathname formatting for local backups.
friendly_name = format_path(backup.pathname)
if backup in backups_to_preserve:
matching_periods = backups_to_preserve[backup]
logger.info("Preserving %s (matches %s retention %s) ..",
friendly_name, concatenate(map(repr, matching_periods)),
"period" if len(matching_periods) == 1 else "periods")
else:
logger.info("Deleting %s ..", friendly_name)
if not self.dry_run:
# Copy the list with the (possibly user defined) removal command.
removal_command = list(self.removal_command)
# Add the pathname of the backup as the final argument.
removal_command.append(backup.pathname)
# Construct the command object.
command = location.context.prepare(
command=removal_command,
group_by=(location.ssh_alias, location.mount_point),
ionice=self.io_scheduling_class,
)
rotation_commands.append(command)
if not prepare:
def __init__(self, **kw):
"""
Initialize a :class:`PropertyManager` object.
:param kw: Any keyword arguments are passed on to :func:`set_properties()`.
"""
self.set_properties(**kw)
missing_properties = self.missing_properties
if missing_properties:
msg = "missing %s" % pluralize(len(missing_properties), "required argument")
raise TypeError("%s (%s)" % (msg, concatenate(missing_properties)))