Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
output.output_stream(stream, output=stdout)
# Exits 0 if we successfully wrote the stream.
return 0
case = latest_run.get_test()
try:
if repo_type == 'file':
previous_run = repo.get_test_run(repo.latest_id() - 1)
# TODO(mtreinish): add a repository api to get the previous_run to
# unify this logic
else:
previous_run = None
except KeyError:
previous_run = None
failed = False
if not pretty_out:
output_result = results.CLITestResult(latest_run.get_id, stdout,
previous_run)
summary = output_result.get_summary()
output_result.startTestRun()
try:
case.run(output_result)
finally:
output_result.stopTestRun()
failed = not results.wasSuccessful(summary)
else:
stream = latest_run.get_subunit_stream()
failed = subunit_trace.trace(
stream, stdout, post_fails=True, color=color,
suppress_attachments=suppress_attachments,
all_attachments=all_attachments,
show_binary_attachments=show_binary_attachments)
if failed:
def _make_result(repo, list_tests=False, stdout=sys.stdout):
if list_tests:
list_result = testtools.StreamSummary()
return list_result, list_result
else:
def _get_id():
return repo.get_latest_run().get_id()
output_result = results.CLITestResult(_get_id,
stdout, None)
summary_result = output_result.get_summary()
return output_result, summary_result
def stopTestRun(self):
super(CLITestResult, self).stopTestRun()
run_id = self.get_id()
self._summary.stopTestRun()
self._output_summary(run_id)
functools.partial(subunit_trace.show_outcome, stdout,
enable_color=color, abbreviate=abbreviate,
suppress_attachments=suppress_attachments,
all_attachments=all_attachments,
show_binary_attachments=show_binary_attachments))
summary_result = testtools.StreamSummary()
output_result = testtools.CopyStreamResult([outcomes, summary_result])
output_result = testtools.StreamResultRouter(output_result)
cat = subunit.test_results.CatFiles(stdout)
output_result.add_rule(cat, 'test_id', test_id=None)
else:
try:
previous_run = repo.get_latest_run()
except KeyError:
previous_run = None
output_result = results.CLITestResult(
inserter.get_id, stdout, previous_run)
summary_result = output_result.get_summary()
result = testtools.CopyStreamResult([inserter, output_result])
result.startTestRun()
try:
case.run(result)
finally:
result.stopTestRun()
if pretty_out and not subunit_out:
start_times = []
stop_times = []
for worker in subunit_trace.RESULTS:
for test in subunit_trace.RESULTS[worker]:
if not test['timestamps'][0] or not test['timestamps'][1]:
continue
start_times.append(test['timestamps'][0])