Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
are 'file' and 'sql'.
:param str repo_url: The url of the repository to use.
:param bool list_test: Show only a list of failing tests.
:param bool subunit: Show output as a subunit stream.
:param file stdout: The output file to write all output to. By default
this is sys.stdout
:return return_code: The exit code for the command. 0 for success and > 0
for failures.
:rtype: int
"""
if repo_type not in ['file', 'sql']:
stdout.write('Repository type %s is not a type' % repo_type)
return 1
repo = util.get_repo_open(repo_type, repo_url)
run = repo.get_failing()
if subunit:
return _show_subunit(run)
case = run.get_test()
failed = False
result, summary = _make_result(repo, list_tests=list_tests)
result.startTestRun()
try:
case.run(result)
finally:
result.stopTestRun()
failed = not results.wasSuccessful(summary)
if failed:
result = 1
else:
result = 0
:param bool all_attachments: When set true subunit_trace will print all
text attachments on successful test execution.
:param str pdb: Takes in a single test_id to bypasses test
discover and just execute the test specified without launching any
additional processes. A file name may be used in place of a test name.
:param bool dynamic: Enable dynamic scheduling
:return return_code: The exit code for the command. 0 for success and > 0
for failures.
:rtype: int
"""
if partial:
warnings.warn('The partial flag is deprecated and has no effect '
'anymore')
try:
repo = util.get_repo_open(repo_type, repo_url)
# If a repo is not found, and there a testr config exists just create it
except repository.RepositoryNotFound:
if not os.path.isfile(config) and not test_path:
msg = ("No config file found and --test-path not specified. "
"Either create or specify a .stestr.conf or use "
"--test-path ")
stdout.write(msg)
exit(1)
try:
repo = util.get_repo_initialise(repo_type, repo_url)
except OSError as e:
if e.errno != errno.EEXIST:
raise
repo_path = repo_url or './stestr'
stdout.write('The specified repository directory %s already '
'exists. Please check if the repository already '
:param bool subunit: Show output as a subunit stream.
:param file stdout: The output file to write all output to. By default
this is sys.stdout
:param bool suppress_attachments: When set true attachments subunit_trace
will not print attachments on successful test execution.
:param bool all_attachments: When set true subunit_trace will print all
text attachments on successful test execution.
:param bool show_binary_attachments: When set to true, subunit_trace will
print binary attachments in addition to text attachments.
:return return_code: The exit code for the command. 0 for success and > 0
for failures.
:rtype: int
"""
try:
repo = util.get_repo_open(repo_type, repo_url)
except abstract.RepositoryNotFound as e:
stdout.write(str(e) + '\n')
return 1
try:
latest_run = repo.get_latest_run()
except KeyError as e:
stdout.write(str(e) + '\n')
return 1
if subunit_out:
stream = latest_run.get_subunit_stream()
output.output_stream(stream, output=stdout)
# Exits 0 if we successfully wrote the stream.
return 0
case = latest_run.get_test()
def stats(repo_type='file', repo_url=None):
"""Print repo stats
:param str repo_type: This is the type of repository to use. Valid choices
are 'file' and 'sql'.
:param str repo_url: The url of the repository to use.
"""
repo = util.get_repo_open(repo_type, repo_url)
sys.stdout.write('%s=%s\n' % ('runs', repo.count()))
return 0
Optionally, using the ``show_all`` argument, it will print all the tests,
instead of just 10. sorted by time.
:param str repo_type: This is the type of repository to use. Valid choices
are 'file' and 'sql'.
:param str repo_url: The url of the repository to use.
:param bool show_all: Show timing for all tests.
:param file stdout: The output file to write all output to. By default
this is sys.stdout
:return return_code: The exit code for the command. 0 for success and > 0
for failures.
:rtype: int
"""
repo = util.get_repo_open(repo_type, repo_url)
try:
latest_id = repo.latest_id()
except KeyError:
return 3
# what happens when there is no timing info?
test_times = repo.get_test_times(repo.get_test_ids(latest_id))
known_times = list(test_times['known'].items())
known_times.sort(key=itemgetter(1), reverse=True)
if len(known_times) > 0:
# By default show 10 rows
if not show_all:
known_times = known_times[:10]
known_times = format_times(known_times)
header = ('Test id', 'Runtime (s)')
rows = [header] + known_times
output.output_table(rows, output=stdout)
if not group_regex \
and self.parser.has_option('DEFAULT', 'parallel_class') \
and self.parser.getboolean('DEFAULT', 'parallel_class'):
group_regex = r'([^\.]*\.)*'
if not group_regex and self.parser.has_option('DEFAULT',
'group_regex'):
group_regex = self.parser.get('DEFAULT', 'group_regex')
if group_regex:
def group_callback(test_id, regex=re.compile(group_regex)):
match = regex.match(test_id)
if match:
return match.group(0)
else:
group_callback = None
# Handle the results repository
repository = util.get_repo_open(repo_type, repo_url)
return test_processor.TestProcessorFixture(
test_ids, command, listopt, idoption, repository,
test_filters=regexes, group_callback=group_callback, serial=serial,
worker_path=worker_path, concurrency=concurrency,
blacklist_file=blacklist_file, black_regex=black_regex,
whitelist_file=whitelist_file, randomize=randomize,
dynamic=dynamic)
repo_url=repo_url, run_id=combine_id,
pretty_out=pretty_out, color=color, stdout=stdout,
abbreviate=abbreviate,
suppress_attachments=suppress_attachments,
all_attachments=all_attachments)
if not until_failure:
return run_tests()
else:
while True:
result = run_tests()
# If we're using subunit output we want to make sure to check
# the result from the repository because load() returns 0
# always on subunit output
if subunit_out:
repo = util.get_repo_open(repo_type, repo_url)
summary = testtools.StreamSummary()
last_run = repo.get_latest_run().get_subunit_stream()
stream = subunit.ByteStreamToStreamResult(last_run)
summary.startTestRun()
try:
stream.run(summary)
finally:
summary.stopTestRun()
if not results.wasSuccessful(summary):
result = 1
if result:
return result
finally:
cmd.cleanUp()