Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def setUp(self):
"""
Creates a Transporter object
Mocks Redis, channel, schedule_next_message and basic_publish
"""
unittest.TestCase.setUp(self)
self.transporter = transport.Transporter("1", threading.Lock(),
config.ReadConfig(),
"queue1", threading.Event())
self.transporter.stopping = False
self.test_message = "{'key1': 'value1', 'key2': 'value2'}"
self.transporter.redis = Mock()
self.transporter.redis.chunk_pop.return_value = ([self.test_message]*8,
False)
self.transporter.channel = Mock()
self.transporter.channel.basic_publish = mock_publish
self.transporter.schedule_next_message = Mock(return_value=None)
def setUp(self):
TestCase.setUp(self)
self.client = Client("testuser", "testapikey")
def setUp(self):
if not os.path.isdir(TestSearchCommandsApp.app_root):
build_command = os.path.join(project_root, 'examples', 'searchcommands_app', 'setup.py build')
self.skipTest("You must build the searchcommands_app by running " + build_command)
TestCase.setUp(self)
def setUp(self):
TestCase.setUp(self)
self.client_config = self.init_client_config()
def setUp(self):
unittest.TestCase.setUp(self)
self.service = client.connect(**self.opts.kwargs)
# If Splunk is in a state requiring restart, go ahead
# and restart. That way we'll be sane for the rest of
# the test.
if self.service.restart_required:
self.restartSplunk()
logging.debug("Connected to splunkd version %s", '.'.join(str(x) for x in self.service.splunk_version))
def setUp(self):
TestCase.setUp(self)
self.client = Client("testuser", "testapikey")
def setUp(self):
TestCase.setUp(self)
self.client = Client("testuser", "testapikey")