Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def container():
container = ServiceContainer(Service, config={})
for ext in container.extensions:
ext._reset_calls()
CallCollectorMixin.call_counter = 0
return container
def factory(config=None, bot_name=None):
class Service(object):
name = "service"
slack_api = Slack(bot_name)
@dummy
def dummy(self):
pass
container = ServiceContainer(Service, config or default_config)
containers.append(container)
return get_extension(container, Slack)
def test_end_to_end(redis_db):
config = {
REDIS_URIS_KEY: {
'server': redis_db
}
}
container = ServiceContainer(ExampleService, config)
container.start()
# write through the service
with entrypoint_hook(container, "write") as write:
write("foobar")
# verify changes written to redis
client = StrictRedis.from_url(redis_db, decode_responses=True)
assert client.get(TEST_KEY) == "foobar"
# read through the service
with entrypoint_hook(container, "read") as read:
assert read() == "foobar"
class Service(object):
name = "service"
@rpc(sensitive_arguments=sensitive_arguments)
def method(self, a, b):
pass # pragma: no cover
complex_arg = {
'foo': [1, 2, 3],
'bar': "BAR"
}
args = ("A", complex_arg)
kwargs = {}
container = ServiceContainer(Service, rabbit_config)
entrypoint = get_extension(container, Rpc)
redacted = get_redacted_args(entrypoint, *args, **kwargs)
assert redacted == expected
def test_provider():
container = Mock(spec=ServiceContainer)
container.service_name = "service"
container.config = Mock()
once = OnceProvider()
once.bind('foobar', container)
once.prepare()
once.start()
with wait_for_call(1, container.spawn_worker) as spawn_worker:
once.stop()
# the once should have stopped and should only have spawned
# a single worker
spawn_worker.assert_called_once_with(once, (), {})
def container(config):
return Mock(
spec=ServiceContainer, config=config, service_name="exampleservice"
)
def test_busy_check_on_teardown():
# max_workers needs to be provided, as it's used in a semaphore count
config = {'max_workers': 4}
kill_called = Mock()
class MockedContainer(ServiceContainer):
def kill(self):
kill_called()
super(MockedContainer, self).kill()
sr = ServiceRunner(config, container_cls=MockedContainer)
sr.add_service(ExampleService)
sr.start()
sr.kill()
with wait_for_call(5, kill_called) as kill_called_waited:
assert kill_called_waited.call_count == 1