Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _load_case(inserter, repo, case, subunit_out, pretty_out,
color, stdout, abbreviate, suppress_attachments,
all_attachments, show_binary_attachments):
if subunit_out:
output_result, summary_result = output.make_result(inserter.get_id,
output=stdout)
elif pretty_out:
outcomes = testtools.StreamToDict(
functools.partial(subunit_trace.show_outcome, stdout,
enable_color=color, abbreviate=abbreviate,
suppress_attachments=suppress_attachments,
all_attachments=all_attachments,
show_binary_attachments=show_binary_attachments))
summary_result = testtools.StreamSummary()
output_result = testtools.CopyStreamResult([outcomes, summary_result])
output_result = testtools.StreamResultRouter(output_result)
cat = subunit.test_results.CatFiles(stdout)
output_result.add_rule(cat, 'test_id', test_id=None)
else:
try:
previous_run = repo.get_latest_run()
except KeyError:
previous_run = None
output_result = results.CLITestResult(
inserter.get_id, stdout, previous_run)
summary_result = output_result.get_summary()
result = testtools.CopyStreamResult([inserter, output_result])
result.startTestRun()
try:
case.run(result)
def trace(stdin, stdout, print_failures=False, failonly=False,
enable_diff=False, abbreviate=False, color=False, post_fails=False,
no_summary=False):
stream = subunit.ByteStreamToStreamResult(
stdin, non_subunit_name='stdout')
outcomes = testtools.StreamToDict(
functools.partial(show_outcome, stdout,
print_failures=print_failures,
failonly=failonly,
enable_diff=enable_diff,
abbreviate=abbreviate,
enable_color=color))
summary = testtools.StreamSummary()
result = testtools.CopyStreamResult([outcomes, summary])
result = testtools.StreamResultRouter(result)
cat = subunit.test_results.CatFiles(stdout)
result.add_rule(cat, 'test_id', test_id=None)
start_time = datetime.datetime.utcnow()
result.startTestRun()
try:
stream.run(result)
finally:
result.stopTestRun()
stop_time = datetime.datetime.utcnow()
elapsed_time = stop_time - start_time
if count_tests('status', '.*') == 0:
print("The test run didn't actually run any tests")
return 1
pretty_out=pretty_out, color=color, stdout=stdout,
abbreviate=abbreviate,
suppress_attachments=suppress_attachments,
all_attachments=all_attachments)
if not until_failure:
return run_tests()
else:
while True:
result = run_tests()
# If we're using subunit output we want to make sure to check
# the result from the repository because load() returns 0
# always on subunit output
if subunit_out:
repo = util.get_repo_open(repo_type, repo_url)
summary = testtools.StreamSummary()
last_run = repo.get_latest_run().get_subunit_stream()
stream = subunit.ByteStreamToStreamResult(last_run)
summary.startTestRun()
try:
stream.run(summary)
finally:
summary.stopTestRun()
if not results.wasSuccessful(summary):
result = 1
if result:
return result
finally:
cmd.cleanUp()
def _make_result(repo, list_tests=False, stdout=sys.stdout):
if list_tests:
list_result = testtools.StreamSummary()
return list_result, list_result
else:
def _get_id():
return repo.get_latest_run().get_id()
output_result = results.CLITestResult(_get_id,
stdout, None)
summary_result = output_result.get_summary()
return output_result, summary_result
def main():
args = parse_args()
stream = subunit.ByteStreamToStreamResult(
sys.stdin, non_subunit_name='stdout')
outcomes = testtools.StreamToDict(
functools.partial(show_outcome, sys.stdout,
print_failures=args.print_failures,
failonly=args.failonly))
summary = testtools.StreamSummary()
result = testtools.CopyStreamResult([outcomes, summary])
result = testtools.StreamResultRouter(result)
cat = subunit.test_results.CatFiles(sys.stdout)
result.add_rule(cat, 'test_id', test_id=None)
start_time = datetime.datetime.utcnow()
result.startTestRun()
try:
stream.run(result)
finally:
result.stopTestRun()
stop_time = datetime.datetime.utcnow()
elapsed_time = stop_time - start_time
if count_tests('status', '.*') == 0:
print("The test run didn't actually run any tests")
exit(1)
def main():
args = parse_args()
stream = subunit.ByteStreamToStreamResult(
sys.stdin, non_subunit_name='stdout')
starts = Starts(sys.stdout)
outcomes = testtools.StreamToDict(
functools.partial(show_outcome, sys.stdout,
print_failures=args.print_failures,
failonly=args.failonly
))
summary = testtools.StreamSummary()
result = testtools.CopyStreamResult([starts, outcomes, summary])
result.startTestRun()
try:
stream.run(result)
finally:
result.stopTestRun()
if count_tests('status', '.*') == 0:
print("The test run didn't actually run any tests")
return 1
if args.post_fails:
print_fails(sys.stdout)
print_summary(sys.stdout)
return (0 if summary.wasSuccessful() else 1)
def convert_stream(stream_file, strip_details=False):
"""Converts a subunit stream into a raw list of test dicts.
:param stream_file: subunit stream to be converted
:param strip_details: if True, remove test details (e.g. stdout/stderr)
:return: a list of individual test results
"""
ret = []
result_stream = subunit.ByteStreamToStreamResult(stream_file)
starts = StreamResult()
summary = StreamSummary()
outcomes = StreamToDict(partial(_read_test,
out=ret,
strip_details=strip_details))
result = CopyStreamResult([starts, outcomes, summary])
result.startTestRun()
result_stream.run(result)
result.stopTestRun()
return ret
def main():
args = parse_args()
stream = subunit.ByteStreamToStreamResult(
sys.stdin, non_subunit_name='stdout')
starts = Starts(sys.stdout)
outcomes = testtools.StreamToDict(
functools.partial(show_outcome, sys.stdout,
print_failures=args.print_failures,
failonly=args.failonly
))
summary = testtools.StreamSummary()
result = testtools.CopyStreamResult([starts, outcomes, summary])
result.startTestRun()
try:
stream.run(result)
finally:
result.stopTestRun()
if count_tests('status', '.*') == 0:
print("The test run didn't execute any tests")
return 1
if args.post_fails:
print_fails(sys.stdout)
print_summary(sys.stdout)
return (0 if summary.wasSuccessful() else 1)