Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_encoding(self):
content_old = (
"lineö1\n".encode("utf-8")
)
content_new = (
"lineö1\n".encode("latin-1")
)
self.assertEqual(
files.diff(content_old, content_new, "/foo", encoding_hint="latin-1"),
(
red("--- /foo") + "\n" +
green("+++ ") + "\n" +
"@@ -1 +1 @@\n" +
red("-lineö1") + "\n" +
green("+line�1") + "\n"
),
def test_encoding_unknown(self):
content_old = (
"lineö1\n".encode("utf-8")
)
content_new = (
"lineö1\n".encode("latin-1")
)
self.assertEqual(
files.diff(content_old, content_new, "/foo", encoding_hint="ascii"),
(
red("--- /foo") + "\n" +
green("+++ ") + "\n" +
"@@ -1 +1 @@\n" +
red("-lineö1") + "\n" +
green("+line�1") + "\n"
),
def test_encoding(self):
content_old = (
"lineö1\n".encode("utf-8")
)
content_new = (
"lineö1\n".encode("latin-1")
)
self.assertEqual(
files.diff(content_old, content_new, "/foo", encoding_hint="latin-1"),
(
red("--- /foo") + "\n" +
green("+++ ") + "\n" +
"@@ -1 +1 @@\n" +
red("-lineö1") + "\n" +
green("+line�1") + "\n"
),
bold(_("node")),
bold(_("return code")),
bold(_("time")),
], ROW_SEPARATOR]
if include_stdout:
rows[0].append(bold(_("stdout")))
if include_stderr:
rows[0].append(bold(_("stderr")))
for node_name, result in sorted(results.items()):
row = [node_name]
if result is None:
# node has been skipped
continue
elif result.return_code == 0:
row.append(green(str(result.return_code)))
else:
row.append(red(str(result.return_code)))
row.append(format_duration(result.duration, msec=True))
rows.append(row)
if include_stdout or include_stderr:
stdout = result.stdout.decode('utf-8', errors='replace').strip().split("\n")
stderr = result.stderr.decode('utf-8', errors='replace').strip().split("\n")
if include_stdout:
row.append(stdout[0])
if include_stderr:
row.append(stderr[0])
for stdout_line, stderr_line in list(zip_longest(stdout, stderr, fillvalue=""))[1:]:
continuation_row = ["", "", ""]
if include_stdout:
continuation_row.append(stdout_line)
if include_stderr:
" " * (len(title) - 1),
green(value2),
)
output = bold(title) + "\n"
for line in tuple(unified_diff(
value1.splitlines(True),
value2.splitlines(True),
))[2:]:
suffix = ""
if len(line) > DIFF_MAX_LINE_LENGTH:
suffix += _(" (line truncated after {} characters)").format(DIFF_MAX_LINE_LENGTH)
if not line.endswith("\n"):
suffix += _(" (no newline at end of file)")
line = line[:DIFF_MAX_LINE_LENGTH].rstrip("\n")
if line.startswith("+"):
line = green(line)
elif line.startswith("-"):
line = red(line)
output += line + suffix + "\n"
return output
details_text = "({})".format(", ".join(sorted(details)))
if result == Item.STATUS_FAILED:
return "{x} {node} {bundle} {item} {status} {details}".format(
bundle=bold(bundle),
details=details_text,
item=item,
node=bold(node),
status=red(_("failed")),
x=bold(red("✘")),
)
elif result == Item.STATUS_ACTION_SUCCEEDED:
return "{x} {node} {bundle} {item} {status}".format(
bundle=bold(bundle),
item=item,
node=bold(node),
status=green(_("succeeded")),
x=bold(green("✓")),
)
elif result == Item.STATUS_SKIPPED:
return "{x} {node} {bundle} {item} {status} {details}".format(
bundle=bold(bundle),
details=details_text,
item=item,
node=bold(node),
x=bold(yellow("»")),
status=yellow(_("skipped")),
)
elif result == Item.STATUS_FIXED:
return "{x} {node} {bundle} {item} {status} {details}".format(
bundle=bold(bundle),
details=details_text,
item=item,
def handle_result(task_id, return_value, duration):
io.progress_advance()
if return_value is True:
io.stdout(_("{x} {node} lock {id} removed").format(
x=green("✓"),
node=bold(task_id.ljust(max_node_name_length)),
id=args['lock_id'].upper(),
))
else:
io.stderr(_(
"{x} {node} has no lock with ID {id}"
).format(
x=red("!"),
node=bold(task_id.ljust(max_node_name_length)),
id=args['lock_id'].upper(),
))
def diff_value_text(title, value1, value2):
max_length = max(len(value1), len(value2))
value1, value2 = force_text(value1), force_text(value2)
if (
"\n" not in value1 and
"\n" not in value2
):
if max_length < DIFF_MAX_INLINE_LENGTH:
return "{} {} → {}".format(
bold(title),
red(value1),
green(value2),
)
elif max_length < DIFF_MAX_LINE_LENGTH:
return "{} {}\n{}→ {}".format(
bold(title),
red(value1),
" " * (len(title) - 1),
green(value2),
)
output = bold(title) + "\n"
for line in tuple(unified_diff(
value1.splitlines(True),
value2.splitlines(True),
))[2:]:
suffix = ""
if len(line) > DIFF_MAX_LINE_LENGTH:
suffix += _(" (line truncated after {} characters)").format(DIFF_MAX_LINE_LENGTH)
if item.attributes['delete']:
io.stderr(_(
"{x} skipped {filename} ('delete' attribute set)"
).format(x=yellow("»"), filename=bold(item.name)))
continue
try:
write_preview(item, args['file_preview_path'])
except FaultUnavailable:
io.stderr(_(
"{x} skipped {path} (Fault unavailable)"
).format(x=yellow("»"), path=bold(item.name)))
else:
io.stdout(_(
"{x} wrote {path}"
).format(
x=green("✓"),
path=bold(join(
args['file_preview_path'],
item.name.lstrip("/"),
)),
))
elif args['item']:
item = get_item(node, args['item'])
if args['preview']:
try:
io.stdout(
item.preview(),
append_newline=False,
)
except NotImplementedError:
io.stderr(_(
"{x} cannot preview {item} on {node} (doesn't support previews)"