Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
stdout.write('Repository type %s is not a type' % repo_type)
return 1
repo = util.get_repo_open(repo_type, repo_url)
run = repo.get_failing()
if subunit:
return _show_subunit(run)
case = run.get_test()
failed = False
result, summary = _make_result(repo, list_tests=list_tests)
result.startTestRun()
try:
case.run(result)
finally:
result.stopTestRun()
failed = not results.wasSuccessful(summary)
if failed:
result = 1
else:
result = 0
if list_tests:
failing_tests = [
test for test, _ in summary.errors + summary.failures]
output.output_tests(failing_tests, output=stdout)
return result
def list_tests(self):
"""List the tests returned by list_cmd.
:return: A list of test ids.
"""
run_proc = self._start_process(self.list_cmd)
out, err = run_proc.communicate()
if run_proc.returncode != 0:
sys.stdout.write("\n=========================\n"
"Failures during discovery"
"\n=========================\n")
new_out = six.BytesIO()
v2.ByteStreamToStreamResult(
six.BytesIO(out), 'stdout').run(
results.CatFiles(new_out))
out = new_out.getvalue()
if out:
if six.PY3:
sys.stdout.write(out.decode('utf8'))
else:
sys.stdout.write(out)
if err:
if six.PY3:
sys.stdout.write(out.decode('utf8'))
else:
sys.stderr.write(err)
sys.stdout.write("\n" + "=" * 80 + "\n"
"The above traceback was encountered during "
"test discovery which imports all the found test"
" modules in the specified test_path.\n")
exit(100)
while True:
result = run_tests()
# If we're using subunit output we want to make sure to check
# the result from the repository because load() returns 0
# always on subunit output
if subunit_out:
repo = util.get_repo_open(repo_type, repo_url)
summary = testtools.StreamSummary()
last_run = repo.get_latest_run().get_subunit_stream()
stream = subunit.ByteStreamToStreamResult(last_run)
summary.startTestRun()
try:
stream.run(summary)
finally:
summary.stopTestRun()
if not results.wasSuccessful(summary):
result = 1
if result:
return result
finally:
cmd.cleanUp()
stop_times = []
for worker in subunit_trace.RESULTS:
for test in subunit_trace.RESULTS[worker]:
if not test['timestamps'][0] or not test['timestamps'][1]:
continue
start_times.append(test['timestamps'][0])
stop_times.append(test['timestamps'][1])
if not start_times or not stop_times:
sys.stderr.write("\nNo tests were successful during the run")
return 1
start_time = min(start_times)
stop_time = max(stop_times)
elapsed_time = stop_time - start_time
subunit_trace.print_fails(stdout)
subunit_trace.print_summary(stdout, elapsed_time)
if not results.wasSuccessful(summary_result):
return 1
else:
return 0
# unify this logic
else:
previous_run = None
except KeyError:
previous_run = None
failed = False
if not pretty_out:
output_result = results.CLITestResult(latest_run.get_id, stdout,
previous_run)
summary = output_result.get_summary()
output_result.startTestRun()
try:
case.run(output_result)
finally:
output_result.stopTestRun()
failed = not results.wasSuccessful(summary)
else:
stream = latest_run.get_subunit_stream()
failed = subunit_trace.trace(
stream, stdout, post_fails=True, color=color,
suppress_attachments=suppress_attachments,
all_attachments=all_attachments,
show_binary_attachments=show_binary_attachments)
if failed:
return 1
else:
return 0