Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self, plugin):
super(FlakyXdist, self).__init__()
self._plugin = plugin
def pytest_testnodedown(self, node, error):
"""
Pytest hook for responding to a test node shutting down.
Copy worker flaky report output so it's available on the master flaky report.
"""
# pylint: disable=unused-argument, no-self-use
worker_output = _get_worker_output(node)
if worker_output is not None and 'flaky_report' in worker_output:
self._plugin.stream.write(worker_output['flaky_report'])
class FlakyPlugin(_FlakyPlugin):
"""
Plugin for pytest that allows retrying flaky tests.
"""
runner = None
flaky_report = True
force_flaky = False
max_runs = None
min_passes = None
config = None
_call_infos = {}
_PYTEST_WHEN_SETUP = 'setup'
_PYTEST_WHEN_CALL = 'call'
_PYTEST_WHENS = (_PYTEST_WHEN_SETUP, _PYTEST_WHEN_CALL)
_PYTEST_OUTCOME_PASSED = 'passed'
_PYTEST_OUTCOME_FAILED = 'failed'
def __init__(self):
super(_FlakyPlugin, self).__init__()
self._stream = StringIO()
self._flaky_success_report = True
self._had_flaky_tests = False
# coding: utf-8
from __future__ import unicode_literals
import logging
from optparse import OptionGroup
import os
from nose.failure import Failure
from nose.plugins import Plugin
from nose.result import TextTestResult
from flaky._flaky_plugin import _FlakyPlugin
class FlakyPlugin(_FlakyPlugin, Plugin):
"""
Plugin for nosetests that allows retrying flaky tests.
"""
name = 'flaky'
def __init__(self):
super(FlakyPlugin, self).__init__()
self._logger = logging.getLogger('nose.plugins.flaky')
self._flaky_result = None
self._nose_result = None
self._flaky_report = True
self._force_flaky = False
self._max_runs = None
self._min_passes = None
self._test_status = {}
self._tests_that_reran = set()