Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
all_attachments=all_attachments,
show_binary_attachments=show_binary_attachments))
summary_result = testtools.StreamSummary()
output_result = testtools.CopyStreamResult([outcomes, summary_result])
output_result = testtools.StreamResultRouter(output_result)
cat = subunit.test_results.CatFiles(stdout)
output_result.add_rule(cat, 'test_id', test_id=None)
else:
try:
previous_run = repo.get_latest_run()
except KeyError:
previous_run = None
output_result = results.CLITestResult(
inserter.get_id, stdout, previous_run)
summary_result = output_result.get_summary()
result = testtools.CopyStreamResult([inserter, output_result])
result.startTestRun()
try:
case.run(result)
finally:
result.stopTestRun()
if pretty_out and not subunit_out:
start_times = []
stop_times = []
for worker in subunit_trace.RESULTS:
for test in subunit_trace.RESULTS[worker]:
if not test['timestamps'][0] or not test['timestamps'][1]:
continue
start_times.append(test['timestamps'][0])
stop_times.append(test['timestamps'][1])
if not start_times or not stop_times:
sys.stderr.write("\nNo tests were successful during the run")
self._metadata = metadata
if not self._run_id:
fd, name = tempfile.mkstemp(dir=self._repository.base)
self.fname = name
stream = os.fdopen(fd, 'wb')
else:
self.fname = os.path.join(self._repository.base, self._run_id)
stream = open(self.fname, 'ab')
self.partial = partial
# The time take by each test, flushed at the end.
self._times = {}
self._test_start = None
self._time = None
subunit_client = testtools.StreamToExtendedDecorator(
TestProtocolClient(stream))
self.hook = testtools.CopyStreamResult([
subunit_client,
testtools.StreamToDict(self._handle_test)])
self._stream = stream
def startTestRun(self):
self._subunit = BytesIO()
serialiser = subunit.v2.StreamResultToBytes(self._subunit)
self._hook = testtools.CopyStreamResult([
testtools.StreamToDict(self._handle_test),
serialiser])
self._hook.startTestRun()
def startTestRun(self):
self._subunit = io.BytesIO()
self.subunit_stream = subunit.v2.StreamResultToBytes(self._subunit)
self.hook = testtools.CopyStreamResult([
testtools.StreamToDict(self._handle_test),
self.subunit_stream])
self.hook.startTestRun()
self.start_time = datetime.datetime.utcnow()
session = self.session_factory()
if not self._run_id:
self.run = db_api.create_run(session=session)
if self._metadata:
db_api.add_run_metadata({'stestr_run_meta': self._metadata},
self.run.id, session=session)
self._run_id = self.run.uuid
else:
int_id = db_api.get_run_id_from_uuid(self._run_id, session=session)
self.run = db_api.get_run_by_id(int_id, session=session)
session.close()
self.totals = {}
def main():
args = parse_args()
stream = subunit.ByteStreamToStreamResult(
sys.stdin, non_subunit_name='stdout')
outcomes = testtools.StreamToDict(
functools.partial(show_outcome, sys.stdout,
print_failures=args.print_failures,
failonly=args.failonly))
summary = testtools.StreamSummary()
result = testtools.CopyStreamResult([outcomes, summary])
result = testtools.StreamResultRouter(result)
cat = subunit.test_results.CatFiles(sys.stdout)
result.add_rule(cat, 'test_id', test_id=None)
start_time = datetime.datetime.utcnow()
result.startTestRun()
try:
stream.run(result)
finally:
result.stopTestRun()
stop_time = datetime.datetime.utcnow()
elapsed_time = stop_time - start_time
if count_tests('status', '.*') == 0:
print("The test run didn't actually run any tests")
exit(1)
if args.post_fails:
def main():
args = parse_args()
stream = subunit.ByteStreamToStreamResult(
sys.stdin, non_subunit_name='stdout')
starts = Starts(sys.stdout)
outcomes = testtools.StreamToDict(
functools.partial(show_outcome, sys.stdout,
print_failures=args.print_failures))
summary = testtools.StreamSummary()
result = testtools.CopyStreamResult([starts, outcomes, summary])
result.startTestRun()
try:
stream.run(result)
finally:
result.stopTestRun()
if args.post_fails:
print_fails(sys.stdout)
print_summary(sys.stdout)
return (0 if summary.wasSuccessful() else 1)
def main():
args = parse_args()
stream = subunit.ByteStreamToStreamResult(
sys.stdin, non_subunit_name='stdout')
starts = Starts(sys.stdout)
outcomes = testtools.StreamToDict(
functools.partial(show_outcome, sys.stdout,
print_failures=args.print_failures,
failonly=args.failonly
))
summary = testtools.StreamSummary()
result = testtools.CopyStreamResult([starts, outcomes, summary])
result.startTestRun()
try:
stream.run(result)
finally:
result.stopTestRun()
if count_tests('status', '.*') == 0:
print("The test run didn't actually run any tests")
return 1
if args.post_fails:
print_fails(sys.stdout)
print_summary(sys.stdout)
return (0 if summary.wasSuccessful() else 1)
def main():
args = parse_args()
stream = subunit.ByteStreamToStreamResult(
sys.stdin, non_subunit_name='stdout')
starts = Starts(sys.stdout)
outcomes = testtools.StreamToDict(
functools.partial(show_outcome, sys.stdout,
print_failures=args.print_failures,
failonly=args.failonly))
summary = testtools.StreamSummary()
result = testtools.CopyStreamResult([starts, outcomes, summary])
result.startTestRun()
try:
stream.run(result)
finally:
result.stopTestRun()
if count_tests('status', '.*') == 0:
print("The test run didn't actually run any tests")
return 1
if args.post_fails:
print_fails(sys.stdout)
print_summary(sys.stdout)
return (0 if summary.wasSuccessful() else 1)
:param stream_file: subunit stream to be converted
:param strip_details: if True, remove test details (e.g. stdout/stderr)
:return: a list of individual test results
"""
ret = []
result_stream = subunit.ByteStreamToStreamResult(stream_file)
starts = StreamResult()
summary = StreamSummary()
outcomes = StreamToDict(partial(_read_test,
out=ret,
strip_details=strip_details))
result = CopyStreamResult([starts, outcomes, summary])
result.startTestRun()
result_stream.run(result)
result.stopTestRun()
return ret