Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
sys.argv[1:] = unparsed_args
# By default, pause before exiting
if not args.no_pause:
atexit.register(lambda: not multiprocessing.current_process().name.startswith("PoolWorker-") and
raw_input("Press Enter to exit ..."))
print("Testing", full_version() + "\n")
# Additional setup normally done by green.cmdline.main()
if has_green:
green_args = green.config.parseArguments()
green_args = green.config.mergeConfig(green_args)
if green_args.shouldExit:
sys.exit(green_args.exitCode)
green.suite.GreenTestSuite.args = green_args
if green_args.debug:
green.output.debug_level = green_args.debug
total_tests = total_skipped = total_failures = total_errors = total_passing = 0
def accumulate_results(r):
global total_tests, total_skipped, total_failures, total_errors, total_passing
total_tests += r.testsRun
total_skipped += len(r.skipped)
total_failures += len(r.failures)
total_errors += len(r.errors)
if has_green:
total_passing += len(r.passing)
timer = timeit.default_timer
start_time = time.time() if has_green else timer()
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument("--no-buffer", action="store_true")
parser.add_argument("--no-pause", action="store_true")
args, unparsed_args = parser.parse_known_args()
sys.argv[1:] = unparsed_args
# By default, pause before exiting
if not args.no_pause:
atexit.register(lambda: not multiprocessing.current_process().name.startswith("PoolWorker-") and
raw_input("Press Enter to exit ..."))
print("Testing", full_version() + "\n")
# Additional setup normally done by green.cmdline.main()
if has_green:
green_args = green.config.parseArguments()
green_args = green.config.mergeConfig(green_args)
if green_args.shouldExit:
sys.exit(green_args.exitCode)
green.suite.GreenTestSuite.args = green_args
if green_args.debug:
green.output.debug_level = green_args.debug
total_tests = total_skipped = total_failures = total_errors = total_passing = 0
def accumulate_results(r):
global total_tests, total_skipped, total_failures, total_errors, total_passing
total_tests += r.testsRun
total_skipped += len(r.skipped)
total_failures += len(r.failures)
total_errors += len(r.errors)
if has_green:
total_passing += len(r.passing)
parser.add_argument("--no-buffer", action="store_true")
parser.add_argument("--no-pause", action="store_true")
args, unparsed_args = parser.parse_known_args()
sys.argv[1:] = unparsed_args
# By default, pause before exiting
if not args.no_pause:
atexit.register(lambda: not multiprocessing.current_process().name.startswith("PoolWorker-") and
raw_input("Press Enter to exit ..."))
print("Testing", full_version() + "\n")
# Additional setup normally done by green.cmdline.main()
if has_green:
green_args = green.config.parseArguments()
green_args = green.config.mergeConfig(green_args)
if green_args.shouldExit:
sys.exit(green_args.exitCode)
green.suite.GreenTestSuite.args = green_args
if green_args.debug:
green.output.debug_level = green_args.debug
total_tests = total_skipped = total_failures = total_errors = total_passing = 0
def accumulate_results(r):
global total_tests, total_skipped, total_failures, total_errors, total_passing
total_tests += r.testsRun
total_skipped += len(r.skipped)
total_failures += len(r.failures)
total_errors += len(r.errors)
if has_green:
total_passing += len(r.passing)
def main(test_module, exit = None, buffer = None):
import green.loader, green.runner
if buffer:
green_args.quiet_stdout = True
try:
suite = green.loader.GreenTestLoader().loadTestsFromModule(test_module) # new API (v2.9+)
except AttributeError:
suite = green.loader.loadFromModule(test_module) # legacy API
results = green.runner.run(suite, sys.stdout, green_args)
# Return the results in an object with a "result" attribute, same as unittest.main()
return collections.namedtuple("Tuple", "result")(results)
def main(test_module, exit = None, buffer = None):
import green.loader, green.runner
if buffer:
green_args.quiet_stdout = True
try:
suite = green.loader.GreenTestLoader().loadTestsFromModule(test_module) # new API (v2.9+)
except AttributeError:
suite = green.loader.loadFromModule(test_module) # legacy API
results = green.runner.run(suite, sys.stdout, green_args)
# Return the results in an object with a "result" attribute, same as unittest.main()
return collections.namedtuple("Tuple", "result")(results)
def main(test_module, exit = None, buffer = None):
import green.loader, green.runner
if buffer:
green_args.quiet_stdout = True
try:
suite = green.loader.GreenTestLoader().loadTestsFromModule(test_module) # new API (v2.9+)
except AttributeError:
suite = green.loader.loadFromModule(test_module) # legacy API
results = green.runner.run(suite, sys.stdout, green_args)
# Return the results in an object with a "result" attribute, same as unittest.main()
return collections.namedtuple("Tuple", "result")(results)
def start_callback(test):
# Let the main process know what test we are starting
test = proto_test(test)
if test not in already_sent:
queue.put(test)
already_sent.add(test)
def finalize_callback(test_result):
# Let the main process know what happened with the test run
queue.put(test_result)
result = ProtoTestResult(start_callback, finalize_callback)
test = None
try:
loader = GreenTestLoader()
test = loader.loadTargets(target)
except:
err = sys.exc_info()
t = ProtoTest()
t.module = 'green.loader'
t.class_name = 'N/A'
t.description = 'Green encountered an error loading the unit test.'
t.method_name = 'poolRunner'
result.startTest(t)
result.addError(t, err)
result.stopTest(t)
queue.put(result)
cleanup()
return
if getattr(test, 'run', False):
self._tearDownPreviousClass(test, result)
self._handleModuleFixture(test, result)
self._handleClassSetUp(test, result)
result._previousTestClass = test.__class__
if (getattr(test.__class__, '_classSetupFailed', False) or
getattr(result, '_moduleSetUpFailed', False)):
continue
if not self.allow_stdout:
captured_stdout = StringIO()
captured_stderr = StringIO()
saved_stdout = sys.stdout
saved_stderr = sys.stderr
sys.stdout = GreenStream(captured_stdout)
sys.stderr = GreenStream(captured_stderr)
test(result)
if _isnotsuite(test):
if not self.allow_stdout:
sys.stdout = saved_stdout
sys.stderr = saved_stderr
result.recordStdout(test, captured_stdout.getvalue())
result.recordStderr(test, captured_stderr.getvalue())
# Since we're intercepting the stdout/stderr out here at the
# suite level, we need to poke the test result and let it know
# when we're ready to transmit results back up to the parent
# process. I would rather just do it automatically at test
# stop time, but we don't have the captured stuff at that
# point. Messy...but the only other alternative I can think of
# is monkey-patching loaded TestCases -- which could be from
if _isnotsuite(test):
self._tearDownPreviousClass(test, result)
self._handleModuleFixture(test, result)
self._handleClassSetUp(test, result)
result._previousTestClass = test.__class__
if (getattr(test.__class__, '_classSetupFailed', False) or
getattr(result, '_moduleSetUpFailed', False)):
continue
if not self.allow_stdout:
captured_stdout = StringIO()
captured_stderr = StringIO()
saved_stdout = sys.stdout
saved_stderr = sys.stderr
sys.stdout = GreenStream(captured_stdout)
sys.stderr = GreenStream(captured_stderr)
test(result)
if _isnotsuite(test):
if not self.allow_stdout:
sys.stdout = saved_stdout
sys.stderr = saved_stderr
result.recordStdout(test, captured_stdout.getvalue())
result.recordStderr(test, captured_stderr.getvalue())
# Since we're intercepting the stdout/stderr out here at the
# suite level, we need to poke the test result and let it know
# when we're ready to transmit results back up to the parent
# process. I would rather just do it automatically at test
# stop time, but we don't have the captured stuff at that
# point. Messy...but the only other alternative I can think of
"""
if test_list is None:
test_list = []
# Python's lousy handling of module import failures during loader
# discovery makes this crazy special case necessary. See
# _make_failed_import_test in the source code for unittest.loader
if suite.__class__.__name__ == 'ModuleImportFailure':
if doing_completions:
return test_list
exception_method = str(suite).split()[0]
getattr(suite, exception_method)()
# On to the real stuff
if issubclass(type(suite), unittest.TestCase):
# Skip actual blank TestCase objects that twisted inserts
if str(type(suite)) != "":
test_list.append(proto_test(suite))
else:
for i in suite:
toProtoTestList(i, test_list, doing_completions)
return test_list