Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_encoding(self):
content_old = (
"lineö1\n".encode("utf-8")
)
content_new = (
"lineö1\n".encode("latin-1")
)
self.assertEqual(
files.diff(content_old, content_new, "/foo", encoding_hint="latin-1"),
(
red("--- /foo") + "\n" +
green("+++ ") + "\n" +
"@@ -1 +1 @@\n" +
red("-lineö1") + "\n" +
green("+line�1") + "\n"
),
def format_item_command_results(results):
output = ""
for i in range(len(results)):
stdout = results[i]['result'].stdout_text.strip()
stderr = results[i]['result'].stderr_text.strip()
# show command
output += "\n{b}".format(b=red('│'))
output += "\n{b} {command} (return code: {code}{no_output})".format(
b=red('├─'),
command=bold(results[i]['command']),
code=bold(results[i]['result'].return_code),
no_output='' if stdout or stderr else '; no output'
)
# show output
lines = []
if stdout or stderr:
output += "\n{b}".format(b=red("│ "))
if stdout:
lines += stdout.strip().split('\n')
if stderr:
lines += stderr.strip().split('\n')
for k in range(len(lines)):
output += "\n{b} {line}".format(b=red("│ "), line=lines[k])
def softlock_list(node):
with io.job(_(" {} checking soft locks...").format(node.name)):
cat = node.run("cat {}".format(SOFT_LOCK_FILE.format(id="*")), may_fail=True)
if cat.return_code != 0:
return []
result = []
for line in cat.stdout.decode('utf-8').strip().split("\n"):
try:
result.append(json.loads(line.strip()))
except json.decoder.JSONDecodeError:
io.stderr(_(
"{x} {node} unable to parse soft lock file contents, ignoring: {line}"
).format(
x=red("!"),
node=bold(node.name),
line=line.strip(),
))
for lock in result[:]:
if lock['expiry'] < time():
io.debug(_("removing expired soft lock {id} from node {node}").format(
id=lock['id'],
node=node.name,
))
softlock_remove(node, lock['id'])
result.remove(lock)
return result
def format_item_command_results(results):
output = ""
for i in range(len(results)):
stdout = results[i]['result'].stdout_text.strip()
stderr = results[i]['result'].stderr_text.strip()
# show command
output += "\n{b}".format(b=red('│'))
output += "\n{b} {command} (return code: {code}{no_output})".format(
b=red('├─'),
command=bold(results[i]['command']),
code=bold(results[i]['result'].return_code),
no_output='' if stdout or stderr else '; no output'
)
# show output
lines = []
if stdout or stderr:
output += "\n{b}".format(b=red("│ "))
if stdout:
lines += stdout.strip().split('\n')
if stderr:
lines += stderr.strip().split('\n')
inline,
):
rows = [[entity_label], ROW_SEPARATOR]
selected_attrs = {attr.strip() for attr in selected_attrs}
if selected_attrs == {'all'}:
selected_attrs = available_attrs
elif 'all' in selected_attrs:
io.stderr(_(
"{x} invalid attribute list requested ('all' and extraneous): {attr}"
).format(x=red("!!!"), attr=", ".join(sorted(selected_attrs))))
exit(1)
for attr in selected_attrs:
if attr not in available_attrs:
io.stderr(_("{x} unknown attribute: {attr}").format(x=red("!!!"), attr=attr))
exit(1)
rows[0].append(bold(attr))
has_list_attrs = False
for entity in sorted(entities):
attr_values = [[entity.name]]
for attr in selected_attrs:
if attr in available_attrs_lists:
if inline:
attr_values.append([",".join(names(getattr(entity, attr)))])
else:
has_list_attrs = True
attr_values.append(sorted(names(getattr(entity, attr))))
else:
attr_values.append([str(getattr(entity, attr))])
number_of_lines = max([len(value) for value in attr_values])
"You probably have to install it with `pip install {pkg}`."
).format(
filename=FILENAME_REQUIREMENTS,
pkg=exc.req,
x=red("!"),
))
except VersionConflict as exc:
raise MissingRepoDependency(_(
"{x} Python package '{required}' is listed in {filename}, "
"but only '{existing}' was found. "
"You probably have to upgrade it with `pip install {required}`."
).format(
existing=exc.dist,
filename=FILENAME_REQUIREMENTS,
required=exc.req,
x=red("!"),
))
self.vault = SecretProxy(self)
# populate bundles
self.bundle_names = []
for dir_entry in listdir(self.bundles_dir):
if validate_name(dir_entry):
self.bundle_names.append(dir_entry)
# populate groups
toml_groups = dict(self.nodes_or_groups_from_dir("groups"))
self.group_dict = {}
for group in self.nodes_or_groups_from_file(self.groups_file, 'groups', toml_groups):
self.add_group(Group(*group))
details_text = "({})".format(_("create"))
elif details is False:
details_text = "({})".format(_("remove"))
elif details is None:
details_text = ""
elif result == Item.STATUS_SKIPPED:
details_text = "({})".format(Item.SKIP_REASON_DESC[details])
else:
details_text = "({})".format(", ".join(sorted(details)))
if result == Item.STATUS_FAILED:
return "{x} {node} {bundle} {item} {status} {details}".format(
bundle=bold(bundle),
details=details_text,
item=item,
node=bold(node),
status=red(_("failed")),
x=bold(red("✘")),
)
elif result == Item.STATUS_ACTION_SUCCEEDED:
return "{x} {node} {bundle} {item} {status}".format(
bundle=bold(bundle),
item=item,
node=bold(node),
status=green(_("succeeded")),
x=bold(green("✓")),
)
elif result == Item.STATUS_SKIPPED:
return "{x} {node} {bundle} {item} {status} {details}".format(
bundle=bold(bundle),
details=details_text,
item=item,
node=bold(node),
time=start.strftime("%Y-%m-%d %H:%M:%S"),
x=blue("i"),
))
error = False
try:
# Running "true" is meant to catch connection errors early,
# but this only works on UNIX-y systems (i.e., not k8s).
if self.os in self.OS_FAMILY_UNIX:
self.run("true")
except RemoteException as exc:
io.stdout(_("{x} {node} Connection error: {msg}").format(
msg=exc,
node=bold(self.name),
x=red("!"),
))
error = _("Connection error (details above)")
item_results = []
else:
try:
with NodeLock(self, interactive=interactive, ignore=force) as lock:
item_results = apply_items(
self,
autoskip_selector=autoskip_selector,
autoonly_selector=autoonly_selector,
my_soft_locks=lock.my_soft_locks,
other_peoples_soft_locks=lock.other_peoples_soft_locks,
workers=workers,
interactive=interactive,
)
except NodeLockedException as e:
def explain_item_dependency_loop(exc, node_name):
"""
Given an ItemDependencyLoop exception and a node name, generates
output lines to help users debug the issue.
"""
items = remove_items_not_contributing_to_loop(exc.items)
yield _(
"{x} There was a dependency problem on node '{node}'. Look at the debug.svg generated "
"by the following command and try to find a loop:\n\n\n"
"printf '{cmd}' | dot -Tsvg -odebug.svg\n\n\n"
).format(
x=red("!"),
node=node_name,
cmd="\\n".join(graph_for_items(node_name, items)),
)
yield _(
"{x} Additionally, here is a list of all items involved "
"and their remaining dependencies:\n"
).format(x=red("!"))
for item in items:
yield "{}\t{}".format(item.id, ",".join(item._deps))
yield "\n\n\n"