How to use the stestr.repository.util function in stestr

To help you get started, we’ve selected a few stestr examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github mtreinish / stestr / stestr / commands / failing.py View on Github external
are 'file' and 'sql'.
    :param str repo_url: The url of the repository to use.
    :param bool list_test: Show only a list of failing tests.
    :param bool subunit: Show output as a subunit stream.
    :param file stdout: The output file to write all output to. By default
        this is sys.stdout

    :return return_code: The exit code for the command. 0 for success and > 0
        for failures.
    :rtype: int
    """
    if repo_type not in ['file', 'sql']:
        stdout.write('Repository type %s is not a type' % repo_type)
        return 1

    repo = util.get_repo_open(repo_type, repo_url)
    run = repo.get_failing()
    if subunit:
        return _show_subunit(run)
    case = run.get_test()
    failed = False
    result, summary = _make_result(repo, list_tests=list_tests)
    result.startTestRun()
    try:
        case.run(result)
    finally:
        result.stopTestRun()
    failed = not results.wasSuccessful(summary)
    if failed:
        result = 1
    else:
        result = 0
github mtreinish / stestr / stestr / commands / run.py View on Github external
:param bool all_attachments: When set true subunit_trace will print all
        text attachments on successful test execution.
    :param str pdb: Takes in a single test_id to bypasses test
        discover and just execute the test specified without launching any
        additional processes. A file name may be used in place of a test name.
    :param bool dynamic: Enable dynamic scheduling

    :return return_code: The exit code for the command. 0 for success and > 0
        for failures.
    :rtype: int
    """
    if partial:
        warnings.warn('The partial flag is deprecated and has no effect '
                      'anymore')
    try:
        repo = util.get_repo_open(repo_type, repo_url)
    # If a repo is not found, and there a testr config exists just create it
    except repository.RepositoryNotFound:
        if not os.path.isfile(config) and not test_path:
            msg = ("No config file found and --test-path not specified. "
                   "Either create or specify a .stestr.conf or use "
                   "--test-path ")
            stdout.write(msg)
            exit(1)
        try:
            repo = util.get_repo_initialise(repo_type, repo_url)
        except OSError as e:
            if e.errno != errno.EEXIST:
                raise
            repo_path = repo_url or './stestr'
            stdout.write('The specified repository directory %s already '
                         'exists. Please check if the repository already '
github mtreinish / stestr / stestr / commands / last.py View on Github external
:param bool subunit: Show output as a subunit stream.
    :param file stdout: The output file to write all output to. By default
         this is sys.stdout
    :param bool suppress_attachments: When set true attachments subunit_trace
        will not print attachments on successful test execution.
    :param bool all_attachments: When set true subunit_trace will print all
        text attachments on successful test execution.
    :param bool show_binary_attachments: When set to true, subunit_trace will
        print binary attachments in addition to text attachments.

    :return return_code: The exit code for the command. 0 for success and > 0
        for failures.
    :rtype: int
    """
    try:
        repo = util.get_repo_open(repo_type, repo_url)
    except abstract.RepositoryNotFound as e:
        stdout.write(str(e) + '\n')
        return 1

    try:
        latest_run = repo.get_latest_run()
    except KeyError as e:
        stdout.write(str(e) + '\n')
        return 1

    if subunit_out:
        stream = latest_run.get_subunit_stream()
        output.output_stream(stream, output=stdout)
        # Exits 0 if we successfully wrote the stream.
        return 0
    case = latest_run.get_test()
github mtreinish / stestr / stestr / commands / stats.py View on Github external
def stats(repo_type='file', repo_url=None):
    """Print repo stats

    :param str repo_type: This is the type of repository to use. Valid choices
        are 'file' and 'sql'.
    :param str repo_url: The url of the repository to use.
    """
    repo = util.get_repo_open(repo_type, repo_url)
    sys.stdout.write('%s=%s\n' % ('runs', repo.count()))
    return 0
github mtreinish / stestr / stestr / commands / slowest.py View on Github external
Optionally, using the ``show_all`` argument, it will print all the tests,
    instead of just 10. sorted by time.

    :param str repo_type: This is the type of repository to use. Valid choices
        are 'file' and 'sql'.
    :param str repo_url: The url of the repository to use.
    :param bool show_all: Show timing for all tests.
    :param file stdout: The output file to write all output to. By default
        this is sys.stdout

    :return return_code: The exit code for the command. 0 for success and > 0
        for failures.
    :rtype: int
    """

    repo = util.get_repo_open(repo_type, repo_url)
    try:
        latest_id = repo.latest_id()
    except KeyError:
        return 3
    # what happens when there is no timing info?
    test_times = repo.get_test_times(repo.get_test_ids(latest_id))
    known_times = list(test_times['known'].items())
    known_times.sort(key=itemgetter(1), reverse=True)
    if len(known_times) > 0:
        # By default show 10 rows
        if not show_all:
            known_times = known_times[:10]
        known_times = format_times(known_times)
        header = ('Test id', 'Runtime (s)')
        rows = [header] + known_times
        output.output_table(rows, output=stdout)
github mtreinish / stestr / stestr / config_file.py View on Github external
if not group_regex \
                and self.parser.has_option('DEFAULT', 'parallel_class') \
                and self.parser.getboolean('DEFAULT', 'parallel_class'):
            group_regex = r'([^\.]*\.)*'
        if not group_regex and self.parser.has_option('DEFAULT',
                                                      'group_regex'):
            group_regex = self.parser.get('DEFAULT', 'group_regex')
        if group_regex:
            def group_callback(test_id, regex=re.compile(group_regex)):
                match = regex.match(test_id)
                if match:
                    return match.group(0)
        else:
            group_callback = None
        # Handle the results repository
        repository = util.get_repo_open(repo_type, repo_url)
        return test_processor.TestProcessorFixture(
            test_ids, command, listopt, idoption, repository,
            test_filters=regexes, group_callback=group_callback, serial=serial,
            worker_path=worker_path, concurrency=concurrency,
            blacklist_file=blacklist_file, black_regex=black_regex,
            whitelist_file=whitelist_file, randomize=randomize,
            dynamic=dynamic)
github mtreinish / stestr / stestr / commands / run.py View on Github external
repo_url=repo_url, run_id=combine_id,
                             pretty_out=pretty_out, color=color, stdout=stdout,
                             abbreviate=abbreviate,
                             suppress_attachments=suppress_attachments,
                             all_attachments=all_attachments)

        if not until_failure:
            return run_tests()
        else:
            while True:
                result = run_tests()
                # If we're using subunit output we want to make sure to check
                # the result from the repository because load() returns 0
                # always on subunit output
                if subunit_out:
                    repo = util.get_repo_open(repo_type, repo_url)
                    summary = testtools.StreamSummary()
                    last_run = repo.get_latest_run().get_subunit_stream()
                    stream = subunit.ByteStreamToStreamResult(last_run)
                    summary.startTestRun()
                    try:
                        stream.run(summary)
                    finally:
                        summary.stopTestRun()
                    if not results.wasSuccessful(summary):
                        result = 1
                if result:
                    return result
    finally:
        cmd.cleanUp()