Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
are 'file' and 'sql'.
:param str repo_url: The url of the repository to use.
:param bool list_test: Show only a list of failing tests.
:param bool subunit: Show output as a subunit stream.
:param file stdout: The output file to write all output to. By default
this is sys.stdout
:return return_code: The exit code for the command. 0 for success and > 0
for failures.
:rtype: int
"""
if repo_type not in ['file', 'sql']:
stdout.write('Repository type %s is not a type' % repo_type)
return 1
repo = util.get_repo_open(repo_type, repo_url)
run = repo.get_failing()
if subunit:
return _show_subunit(run)
case = run.get_test()
failed = False
result, summary = _make_result(repo, list_tests=list_tests)
result.startTestRun()
try:
case.run(result)
finally:
result.stopTestRun()
failed = not results.wasSuccessful(summary)
if failed:
result = 1
else:
result = 0
:param bool all_attachments: When set true subunit_trace will print all
text attachments on successful test execution.
:param str pdb: Takes in a single test_id to bypasses test
discover and just execute the test specified without launching any
additional processes. A file name may be used in place of a test name.
:param bool dynamic: Enable dynamic scheduling
:return return_code: The exit code for the command. 0 for success and > 0
for failures.
:rtype: int
"""
if partial:
warnings.warn('The partial flag is deprecated and has no effect '
'anymore')
try:
repo = util.get_repo_open(repo_type, repo_url)
# If a repo is not found, and there a testr config exists just create it
except repository.RepositoryNotFound:
if not os.path.isfile(config) and not test_path:
msg = ("No config file found and --test-path not specified. "
"Either create or specify a .stestr.conf or use "
"--test-path ")
stdout.write(msg)
exit(1)
try:
repo = util.get_repo_initialise(repo_type, repo_url)
except OSError as e:
if e.errno != errno.EEXIST:
raise
repo_path = repo_url or './stestr'
stdout.write('The specified repository directory %s already '
'exists. Please check if the repository already '
:param bool subunit: Show output as a subunit stream.
:param file stdout: The output file to write all output to. By default
this is sys.stdout
:param bool suppress_attachments: When set true attachments subunit_trace
will not print attachments on successful test execution.
:param bool all_attachments: When set true subunit_trace will print all
text attachments on successful test execution.
:param bool show_binary_attachments: When set to true, subunit_trace will
print binary attachments in addition to text attachments.
:return return_code: The exit code for the command. 0 for success and > 0
for failures.
:rtype: int
"""
try:
repo = util.get_repo_open(repo_type, repo_url)
except abstract.RepositoryNotFound as e:
stdout.write(str(e) + '\n')
return 1
try:
latest_run = repo.get_latest_run()
except KeyError as e:
stdout.write(str(e) + '\n')
return 1
if subunit_out:
stream = latest_run.get_subunit_stream()
output.output_stream(stream, output=stdout)
# Exits 0 if we successfully wrote the stream.
return 0
case = latest_run.get_test()
will not print attachments on successful test execution.
:param bool all_attachments: When set true subunit_trace will print all
text attachments on successful test execution.
:param bool show_binary_attachments: When set to true, subunit_trace will
print binary attachments in addition to text attachments.
:return return_code: The exit code for the command. 0 for success and > 0
for failures.
:rtype: int
"""
if partial:
warnings.warn('The partial flag is deprecated and has no effect '
'anymore')
try:
repo = util.get_repo_open(repo_type, repo_url)
except repository.RepositoryNotFound:
if force_init:
try:
repo = util.get_repo_initialise(repo_type, repo_url)
except OSError as e:
if e.errno != errno.EEXIST:
raise
repo_path = repo_url or './stestr'
stdout.write('The specified repository directory %s already '
'exists. Please check if the repository already '
'exists or select a different path\n'
% repo_path)
exit(1)
else:
raise
# Not a full implementation of TestCase, but we only need to iterate
stdout.write('Repository type %s is not a type' % repo_type)
return 1
repo = util.get_repo_open(repo_type, repo_url)
run = repo.get_failing()
if subunit:
return _show_subunit(run)
case = run.get_test()
failed = False
result, summary = _make_result(repo, list_tests=list_tests)
result.startTestRun()
try:
case.run(result)
finally:
result.stopTestRun()
failed = not results.wasSuccessful(summary)
if failed:
result = 1
else:
result = 0
if list_tests:
failing_tests = [
test for test, _ in summary.errors + summary.failures]
output.output_tests(failing_tests, output=stdout)
return result
def get_test(self):
stream = self.get_subunit_stream()
case = subunit.ByteStreamToStreamResult(stream)
return case
def get_metadata(self):
if self._run_id:
session = self.session_factory()
metadata = db_api.get_run_metadata(self._run_id, session=session)
for meta in metadata:
if meta.key == 'stestr_run_meta':
return meta.value
return None
class _SqlInserter(repository.AbstractTestRun):
"""Insert test results into a sql repository."""
def __init__(self, repository, partial=False, run_id=None, metadata=None):
self._repository = repository
self.partial = partial
self._subunit = None
self._run_id = run_id
self._metadata = metadata
# Create a new session factory
self.engine = sqlalchemy.create_engine(self._repository.base)
self.session_factory = orm.sessionmaker(bind=self.engine,
autocommit=True)
def startTestRun(self):
self._subunit = io.BytesIO()
self.subunit_stream = subunit.v2.StreamResultToBytes(self._subunit)
:param bool all_attachments: When set true subunit_trace will print all
text attachments on successful test execution.
:param bool show_binary_attachments: When set to true, subunit_trace will
print binary attachments in addition to text attachments.
:return return_code: The exit code for the command. 0 for success and > 0
for failures.
:rtype: int
"""
if partial:
warnings.warn('The partial flag is deprecated and has no effect '
'anymore')
try:
repo = util.get_repo_open(repo_type, repo_url)
except repository.RepositoryNotFound:
if force_init:
try:
repo = util.get_repo_initialise(repo_type, repo_url)
except OSError as e:
if e.errno != errno.EEXIST:
raise
repo_path = repo_url or './stestr'
stdout.write('The specified repository directory %s already '
'exists. Please check if the repository already '
'exists or select a different path\n'
% repo_path)
exit(1)
else:
raise
# Not a full implementation of TestCase, but we only need to iterate
# back to it. Needs to be a callable - its a head fake for
output.output_stream(stream, output=stdout)
# Exits 0 if we successfully wrote the stream.
return 0
case = latest_run.get_test()
try:
if repo_type == 'file':
previous_run = repo.get_test_run(repo.latest_id() - 1)
# TODO(mtreinish): add a repository api to get the previous_run to
# unify this logic
else:
previous_run = None
except KeyError:
previous_run = None
failed = False
if not pretty_out:
output_result = results.CLITestResult(latest_run.get_id, stdout,
previous_run)
summary = output_result.get_summary()
output_result.startTestRun()
try:
case.run(output_result)
finally:
output_result.stopTestRun()
failed = not results.wasSuccessful(summary)
else:
stream = latest_run.get_subunit_stream()
failed = subunit_trace.trace(
stream, stdout, post_fails=True, color=color,
suppress_attachments=suppress_attachments,
all_attachments=all_attachments,
show_binary_attachments=show_binary_attachments)
if failed:
def list_tests(self):
"""List the tests returned by list_cmd.
:return: A list of test ids.
"""
run_proc = self._start_process(self.list_cmd)
out, err = run_proc.communicate()
if run_proc.returncode != 0:
sys.stdout.write("\n=========================\n"
"Failures during discovery"
"\n=========================\n")
new_out = six.BytesIO()
v2.ByteStreamToStreamResult(
six.BytesIO(out), 'stdout').run(
results.CatFiles(new_out))
out = new_out.getvalue()
if out:
if six.PY3:
sys.stdout.write(out.decode('utf8'))
else:
sys.stdout.write(out)
if err:
if six.PY3:
sys.stdout.write(out.decode('utf8'))
else:
sys.stderr.write(err)
sys.stdout.write("\n" + "=" * 80 + "\n"
"The above traceback was encountered during "
"test discovery which imports all the found test"
" modules in the specified test_path.\n")
exit(100)
"""
try:
repo = util.get_repo_open(repo_type, repo_url)
except abstract.RepositoryNotFound as e:
stdout.write(str(e) + '\n')
return 1
try:
latest_run = repo.get_latest_run()
except KeyError as e:
stdout.write(str(e) + '\n')
return 1
if subunit_out:
stream = latest_run.get_subunit_stream()
output.output_stream(stream, output=stdout)
# Exits 0 if we successfully wrote the stream.
return 0
case = latest_run.get_test()
try:
if repo_type == 'file':
previous_run = repo.get_test_run(repo.latest_id() - 1)
# TODO(mtreinish): add a repository api to get the previous_run to
# unify this logic
else:
previous_run = None
except KeyError:
previous_run = None
failed = False
if not pretty_out:
output_result = results.CLITestResult(latest_run.get_id, stdout,
previous_run)