Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
test_to_worker = {}
def map_test(test_dict):
tags = test_dict['tags']
id = test_dict['id']
workers = []
for tag in tags:
if tag.startswith('worker-'):
workers.append(tag)
if not workers:
workers = [None]
for worker in workers:
worker_to_test.setdefault(worker, []).append(id)
test_to_worker.setdefault(id, []).extend(workers)
mapper = testtools.StreamToDict(map_test)
mapper.startTestRun()
try:
case.run(mapper)
finally:
mapper.stopTestRun()
self._worker_to_test = worker_to_test
self._test_to_worker = test_to_worker
failing_workers = self._test_to_worker[failing_id]
prior_tests = []
for worker in failing_workers:
worker_tests = self._worker_to_test[worker]
prior_tests.extend(worker_tests[:worker_tests.index(failing_id)])
return prior_tests
def _find_failing(repo):
run = repo.get_failing()
case = run.get_test()
ids = []
def gather_errors(test_dict):
if test_dict['status'] == 'fail':
ids.append(test_dict['id'])
result = testtools.StreamToDict(gather_errors)
result.startTestRun()
try:
case.run(result)
finally:
result.stopTestRun()
return ids
def trace(stdin, stdout, print_failures=False, failonly=False,
enable_diff=False, abbreviate=False, color=False, post_fails=False,
no_summary=False, suppress_attachments=False, all_attachments=False,
show_binary_attachments=False):
stream = subunit.ByteStreamToStreamResult(
stdin, non_subunit_name='stdout')
outcomes = testtools.StreamToDict(
functools.partial(show_outcome, stdout,
print_failures=print_failures,
failonly=failonly,
enable_diff=enable_diff,
abbreviate=abbreviate,
enable_color=color,
suppress_attachments=suppress_attachments,
all_attachments=all_attachments,
show_binary_attachments=show_binary_attachments))
summary = testtools.StreamSummary()
result = testtools.CopyStreamResult([outcomes, summary])
result = testtools.StreamResultRouter(result)
cat = subunit.test_results.CatFiles(stdout)
result.add_rule(cat, 'test_id', test_id=None)
result.startTestRun()
try:
def trace(stdin, stdout, print_failures=False, failonly=False,
enable_diff=False, abbreviate=False, color=False, post_fails=False,
no_summary=False):
stream = subunit.ByteStreamToStreamResult(
stdin, non_subunit_name='stdout')
outcomes = testtools.StreamToDict(
functools.partial(show_outcome, stdout,
print_failures=print_failures,
failonly=failonly,
enable_diff=enable_diff,
abbreviate=abbreviate,
enable_color=color))
summary = testtools.StreamSummary()
result = testtools.CopyStreamResult([outcomes, summary])
result = testtools.StreamResultRouter(result)
cat = subunit.test_results.CatFiles(stdout)
result.add_rule(cat, 'test_id', test_id=None)
start_time = datetime.datetime.utcnow()
result.startTestRun()
try:
stream.run(result)
finally:
def _load_case(inserter, repo, case, subunit_out, pretty_out,
color, stdout, abbreviate, suppress_attachments,
all_attachments, show_binary_attachments):
if subunit_out:
output_result, summary_result = output.make_result(inserter.get_id,
output=stdout)
elif pretty_out:
outcomes = testtools.StreamToDict(
functools.partial(subunit_trace.show_outcome, stdout,
enable_color=color, abbreviate=abbreviate,
suppress_attachments=suppress_attachments,
all_attachments=all_attachments,
show_binary_attachments=show_binary_attachments))
summary_result = testtools.StreamSummary()
output_result = testtools.CopyStreamResult([outcomes, summary_result])
output_result = testtools.StreamResultRouter(output_result)
cat = subunit.test_results.CatFiles(stdout)
output_result.add_rule(cat, 'test_id', test_id=None)
else:
try:
previous_run = repo.get_latest_run()
except KeyError:
previous_run = None
output_result = results.CLITestResult(
def startTestRun(self):
self._subunit = io.BytesIO()
self.subunit_stream = subunit.v2.StreamResultToBytes(self._subunit)
self.hook = testtools.CopyStreamResult([
testtools.StreamToDict(self._handle_test),
self.subunit_stream])
self.hook.startTestRun()
self.start_time = datetime.datetime.utcnow()
session = self.session_factory()
if not self._run_id:
self.run = db_api.create_run(session=session)
if self._metadata:
db_api.add_run_metadata({'stestr_run_meta': self._metadata},
self.run.id, session=session)
self._run_id = self.run.uuid
else:
int_id = db_api.get_run_id_from_uuid(self._run_id, session=session)
self.run = db_api.get_run_by_id(int_id, session=session)
session.close()
self.totals = {}
line = line.encode('utf8')
stream.write("%s\n" % line)
stream.write('\n\n')
ADDPROP_FAIL.append(test)
break
else:
FAILS.append(test)
elif status == 'success' or status == 'xfail':
SUCCESS.append(test)
elif status == 'skip':
SKIPS.append(test)
stream = subunit.ByteStreamToStreamResult(
sys.stdin, non_subunit_name='stdout')
outcome = testtools.StreamToDict(
functools.partial(show_outcome,
sys.stdout))
summary = testtools.StreamSummary()
result = testtools.CopyStreamResult([outcome, summary])
result.startTestRun()
try:
stream.run(result)
finally:
result.stopTestRun()
print("\n\n------------------------------------------------------------------")
print("%s Tests Failed" % len(FAILS))
print("%s Tests Failed with AdditionalProperties" % len(ADDPROP_FAIL))
print("%s Tests Skipped" % len(SKIPS))
print("%s Tests Passed" % len(SUCCESS))
print("To see the full details run this subunit stream through subunit-trace")
def main():
args = parse_args()
stream = subunit.ByteStreamToStreamResult(
sys.stdin, non_subunit_name='stdout')
starts = Starts(sys.stdout)
outcomes = testtools.StreamToDict(
functools.partial(show_outcome, sys.stdout,
print_failures=args.print_failures,
failonly=args.failonly))
summary = testtools.StreamSummary()
result = testtools.CopyStreamResult([starts, outcomes, summary])
result.startTestRun()
try:
stream.run(result)
finally:
result.stopTestRun()
if count_tests('status', '.*') == 0:
print("The test run didn't actually run any tests")
return 1
if args.post_fails:
print_fails(sys.stdout)
print_summary(sys.stdout)
def main():
args = parse_args()
stream = subunit.ByteStreamToStreamResult(
sys.stdin, non_subunit_name='stdout')
starts = Starts(sys.stdout)
outcomes = testtools.StreamToDict(
functools.partial(show_outcome, sys.stdout,
print_failures=args.print_failures,
failonly=args.failonly
))
summary = testtools.StreamSummary()
result = testtools.CopyStreamResult([starts, outcomes, summary])
result.startTestRun()
try:
stream.run(result)
finally:
result.stopTestRun()
if count_tests('status', '.*') == 0:
print("The test run didn't execute any tests")
return 1
if args.post_fails:
print_fails(sys.stdout)