Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if triggered_item_id in added_actions:
# action has already been triggered
continue
try:
type_name, item_name, action_name = triggered_item_id.split(":")
except ValueError:
# not a canned action
continue
target_item_id = "{}:{}".format(type_name, item_name)
try:
target_item = items[target_item_id]
except KeyError:
raise BundleError(_(
"{item} in bundle '{bundle}' triggers unknown item '{target_item}'"
).format(
bundle=item.bundle.name,
item=item.id,
target_item=target_item_id,
))
try:
action_attrs = target_item.get_canned_actions()[action_name]
except KeyError:
raise BundleError(_(
"{item} in bundle '{bundle}' triggers unknown "
"canned action '{action}' on {target_item}"
).format(
action=action_name,
bundle=item.bundle.name,
def parse_pkg_name(pkgname, line):
matches = PKGSPEC_REGEX.match(line)
assert matches != None, _("Unexpected OpenBSD package name: {line}").format(line=line)
installed_package, installed_version_and_more = matches.groups()
assert not installed_version_and_more.endswith("-"), \
_("Unexpected OpenBSD package name (ends in dash): {line}").format(line=line)
if installed_package == pkgname:
if "-" in installed_version_and_more:
tokens = installed_version_and_more.split("-")
installed_version = tokens[0]
installed_flavor = "-".join(tokens[1:])
else:
installed_version = installed_version_and_more
installed_flavor = ""
return True, installed_version, installed_flavor
else:
default=None,
dest='command',
metavar=_("COMMAND"),
required=False,
type=str,
help=_("command to execute in lieu of REPL"),
)
parser_debug.add_argument(
"-n",
"--node",
default=None,
dest='node',
metavar=_("NODE"),
required=False,
type=str,
help=_("name of node to inspect"),
)
# bw diff
help_diff = _("Show differences between nodes")
parser_diff = subparsers.add_parser(
"diff",
description=help_diff,
help=help_diff,
formatter_class=RawTextHelpFormatter, # for HELP_get_target_nodes
)
parser_diff.set_defaults(func=bw_diff)
parser_diff.add_argument(
"-b",
"--branch",
default=None,
dest='branch',
if key_delim > -1:
key_from_text = cryptotext[:key_delim].decode('utf-8')
cryptotext = cryptotext[key_delim + 1:]
else:
key_from_text = None
if key is None:
if key_from_text is not None:
key = key_from_text
else:
key = 'encrypt'
try:
key = self.keys[key]
except KeyError:
raise FaultUnavailable(_(
"Key '{key}' not available for decryption of the following entity, "
"check your {file}: {entity_description}"
).format(
file=FILENAME_SECRETS,
key=key,
entity_description=entity_description,
))
return key, cryptotext
),
)
if invalid_attributes:
raise BundleError(_(
"invalid attribute(s) for '{item}' in bundle '{bundle}': {attrs}"
).format(
item=item_id,
bundle=bundle.name,
attrs=", ".join(invalid_attributes),
))
invalid_attributes = set(attributes.get('when_creating', {}).keys()).difference(
set(cls.WHEN_CREATING_ATTRIBUTES.keys())
)
if invalid_attributes:
raise BundleError(_(
"invalid when_creating attribute(s) for '{item}' in bundle '{bundle}': {attrs}"
).format(
item=item_id,
bundle=bundle.name,
attrs=", ".join(invalid_attributes),
))
v=repr(inner_value),
))
elif allowed_types == TUPLE_OF_INTS:
if not isinstance(value, tuple):
raise ValueError(_("key '{k}' is {i}, but should be a tuple").format(
k=key,
i=type(value),
))
for inner_value in value:
if not isinstance(inner_value, int):
raise ValueError(_("non-int member in '{k}': {v}").format(
k=key,
v=repr(inner_value),
))
elif not isinstance(value, allowed_types):
raise ValueError(_("key '{k}' is {i}, but should be one of: {t}").format(
k=key,
i=type(value),
t=allowed_types,
))
for key in required_keys or []:
if key not in candidate:
raise ValueError(_("missing required key: {}").format(key))
def validate_name(cls, bundle, name):
for char in name:
if char not in _USERNAME_VALID_CHARACTERS:
raise BundleError(_(
"Invalid character in username '{user}': {char} (bundle '{bundle}')"
).format(bundle=bundle.name, char=char, user=name))
if name.endswith("_") or name.endswith("-"):
raise BundleError(_(
"Username '{user}' must not end in dash or underscore (bundle '{bundle}')"
).format(bundle=bundle.name, user=name))
if len(name) > 30:
raise BundleError(_(
"Username '{user}' is longer than 30 characters (bundle '{bundle}')"
).format(bundle=bundle.name, user=name))
dest='attrs',
metavar=_("ATTR"),
nargs='+',
type=str,
help=_("show table with the given attributes for each node "
"(e.g. 'all', 'groups', 'bundles', 'hostname', 'os', ...)"),
)
# bw plot
help_plot = _("Generates DOT output that can be piped into `dot -Tsvg -ooutput.svg`. "
"The resulting output.svg can be viewed using most browsers.")
parser_plot = subparsers.add_parser("plot", description=help_plot, help=help_plot)
parser_plot_subparsers = parser_plot.add_subparsers()
# bw plot group
help_plot_group = _("Plot subgroups and node members for the given group "
"or the entire repository")
parser_plot_subparsers_group = parser_plot_subparsers.add_parser(
"group",
description=help_plot_group,
help=help_plot_group,
)
parser_plot_subparsers_group.set_defaults(func=bw_plot_group)
parser_plot_subparsers_group.add_argument(
'group',
default=None,
metavar=_("GROUP"),
nargs='?',
type=str,
help=_("group to plot"),
)
parser_plot_subparsers_group.add_argument(
def apply(
self,
autoskip_selector="",
autoonly_selector="",
interactive=False,
force=False,
skip_list=tuple(),
workers=4,
):
if not list(self.items):
io.stdout(_("{x} {node} has no items").format(
node=bold(self.name),
x=yellow("»"),
))
return None
if self.covered_by_autoskip_selector(autoskip_selector):
io.stdout(_("{x} {node} skipped by --skip").format(
node=bold(self.name),
x=yellow("»"),
))
return None
if self.name in skip_list:
io.stdout(_("{x} {node} skipped by --resume-file").format(
node=bold(self.name),
x=yellow("»"),
@io.job_wrapper(_("{} determining groups").format(bold("{0.name}")))
def groups(self):
_groups = set()
for group_name in set(self._attributes.get('groups', set())):
with error_context(node=self.name):
_groups.add(self.repo.get_group(group_name))
for group in self.repo.groups:
if group in _groups:
# we're already in this group, no need to check it again
continue
if self in group._nodes_from_members:
_groups.add(group)
continue
for pattern in group._member_patterns:
if pattern.search(self.name) is not None: