How to use the testtools.CopyStreamResult function in testtools

To help you get started, we’ve selected a few testtools examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github mtreinish / stestr / stestr / commands / load.py View on Github external
all_attachments=all_attachments,
                              show_binary_attachments=show_binary_attachments))
        summary_result = testtools.StreamSummary()
        output_result = testtools.CopyStreamResult([outcomes, summary_result])
        output_result = testtools.StreamResultRouter(output_result)
        cat = subunit.test_results.CatFiles(stdout)
        output_result.add_rule(cat, 'test_id', test_id=None)
    else:
        try:
            previous_run = repo.get_latest_run()
        except KeyError:
            previous_run = None
        output_result = results.CLITestResult(
            inserter.get_id, stdout, previous_run)
        summary_result = output_result.get_summary()
    result = testtools.CopyStreamResult([inserter, output_result])
    result.startTestRun()
    try:
        case.run(result)
    finally:
        result.stopTestRun()
    if pretty_out and not subunit_out:
        start_times = []
        stop_times = []
        for worker in subunit_trace.RESULTS:
            for test in subunit_trace.RESULTS[worker]:
                if not test['timestamps'][0] or not test['timestamps'][1]:
                    continue
                start_times.append(test['timestamps'][0])
                stop_times.append(test['timestamps'][1])
        if not start_times or not stop_times:
            sys.stderr.write("\nNo tests were successful during the run")
github mtreinish / stestr / stestr / repository / file.py View on Github external
self._metadata = metadata
        if not self._run_id:
            fd, name = tempfile.mkstemp(dir=self._repository.base)
            self.fname = name
            stream = os.fdopen(fd, 'wb')
        else:
            self.fname = os.path.join(self._repository.base, self._run_id)
            stream = open(self.fname, 'ab')
        self.partial = partial
        # The time take by each test, flushed at the end.
        self._times = {}
        self._test_start = None
        self._time = None
        subunit_client = testtools.StreamToExtendedDecorator(
            TestProtocolClient(stream))
        self.hook = testtools.CopyStreamResult([
            subunit_client,
            testtools.StreamToDict(self._handle_test)])
        self._stream = stream
github mtreinish / stestr / stestr / repository / memory.py View on Github external
def startTestRun(self):
        self._subunit = BytesIO()
        serialiser = subunit.v2.StreamResultToBytes(self._subunit)
        self._hook = testtools.CopyStreamResult([
            testtools.StreamToDict(self._handle_test),
            serialiser])
        self._hook.startTestRun()
github mtreinish / stestr / stestr / repository / sql.py View on Github external
def startTestRun(self):
        self._subunit = io.BytesIO()
        self.subunit_stream = subunit.v2.StreamResultToBytes(self._subunit)
        self.hook = testtools.CopyStreamResult([
            testtools.StreamToDict(self._handle_test),
            self.subunit_stream])
        self.hook.startTestRun()
        self.start_time = datetime.datetime.utcnow()
        session = self.session_factory()
        if not self._run_id:
            self.run = db_api.create_run(session=session)
            if self._metadata:
                db_api.add_run_metadata({'stestr_run_meta': self._metadata},
                                        self.run.id, session=session)
            self._run_id = self.run.uuid
        else:
            int_id = db_api.get_run_id_from_uuid(self._run_id, session=session)
            self.run = db_api.get_run_by_id(int_id, session=session)
        session.close()
        self.totals = {}
github openstack / tempest-lib / tempest_lib / cmd / subunit_trace.py View on Github external
def main():
    args = parse_args()
    stream = subunit.ByteStreamToStreamResult(
        sys.stdin, non_subunit_name='stdout')
    outcomes = testtools.StreamToDict(
        functools.partial(show_outcome, sys.stdout,
                          print_failures=args.print_failures,
                          failonly=args.failonly))
    summary = testtools.StreamSummary()
    result = testtools.CopyStreamResult([outcomes, summary])
    result = testtools.StreamResultRouter(result)
    cat = subunit.test_results.CatFiles(sys.stdout)
    result.add_rule(cat, 'test_id', test_id=None)
    start_time = datetime.datetime.utcnow()
    result.startTestRun()
    try:
        stream.run(result)
    finally:
        result.stopTestRun()
    stop_time = datetime.datetime.utcnow()
    elapsed_time = stop_time - start_time

    if count_tests('status', '.*') == 0:
        print("The test run didn't actually run any tests")
        exit(1)
    if args.post_fails:
github openstack / tempest / tools / subunit-trace.py View on Github external
def main():
    args = parse_args()
    stream = subunit.ByteStreamToStreamResult(
        sys.stdin, non_subunit_name='stdout')
    starts = Starts(sys.stdout)
    outcomes = testtools.StreamToDict(
        functools.partial(show_outcome, sys.stdout,
                          print_failures=args.print_failures))
    summary = testtools.StreamSummary()
    result = testtools.CopyStreamResult([starts, outcomes, summary])
    result.startTestRun()
    try:
        stream.run(result)
    finally:
        result.stopTestRun()
    if args.post_fails:
        print_fails(sys.stdout)
    print_summary(sys.stdout)
    return (0 if summary.wasSuccessful() else 1)
github openstack / taskflow / tools / subunit_trace.py View on Github external
def main():
    args = parse_args()
    stream = subunit.ByteStreamToStreamResult(
        sys.stdin, non_subunit_name='stdout')
    starts = Starts(sys.stdout)
    outcomes = testtools.StreamToDict(
        functools.partial(show_outcome, sys.stdout,
                          print_failures=args.print_failures,
                          failonly=args.failonly
                      ))
    summary = testtools.StreamSummary()
    result = testtools.CopyStreamResult([starts, outcomes, summary])
    result.startTestRun()
    try:
        stream.run(result)
    finally:
        result.stopTestRun()
    if count_tests('status', '.*') == 0:
        print("The test run didn't actually run any tests")
        return 1
    if args.post_fails:
        print_fails(sys.stdout)
    print_summary(sys.stdout)
    return (0 if summary.wasSuccessful() else 1)
github openstack / dragonflow / tools / subunit-trace.py View on Github external
def main():
    args = parse_args()
    stream = subunit.ByteStreamToStreamResult(
        sys.stdin, non_subunit_name='stdout')
    starts = Starts(sys.stdout)
    outcomes = testtools.StreamToDict(
        functools.partial(show_outcome, sys.stdout,
                          print_failures=args.print_failures,
                          failonly=args.failonly))
    summary = testtools.StreamSummary()
    result = testtools.CopyStreamResult([starts, outcomes, summary])
    result.startTestRun()
    try:
        stream.run(result)
    finally:
        result.stopTestRun()
    if count_tests('status', '.*') == 0:
        print("The test run didn't actually run any tests")
        return 1
    if args.post_fails:
        print_fails(sys.stdout)
    print_summary(sys.stdout)
    return (0 if summary.wasSuccessful() else 1)
github openstack / openstack-health / stackviz / parser / tempest_subunit.py View on Github external
:param stream_file: subunit stream to be converted
    :param strip_details: if True, remove test details (e.g. stdout/stderr)
    :return: a list of individual test results
    """

    ret = []

    result_stream = subunit.ByteStreamToStreamResult(stream_file)
    starts = StreamResult()
    summary = StreamSummary()
    outcomes = StreamToDict(partial(_read_test,
                                    out=ret,
                                    strip_details=strip_details))

    result = CopyStreamResult([starts, outcomes, summary])

    result.startTestRun()
    result_stream.run(result)
    result.stopTestRun()

    return ret