How to use the nameko.testing.services.replace_dependencies function in nameko

To help you get started, we’ve selected a few nameko examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github nameko / nameko / test / testing / test_services.py View on Github external
name = "service"
        foo_proxy = RpcProxy("foo_service")
        bar_proxy = RpcProxy("bar_service")
        baz_proxy = RpcProxy("baz_service")

        @rpc
        def method(self, arg):
            self.foo_proxy.remote_method(arg)

    container = container_factory(Service, rabbit_config)

    # replace a single dependency
    foo_proxy = replace_dependencies(container, "foo_proxy")

    # replace multiple dependencies
    replacements = replace_dependencies(container, "bar_proxy", "baz_proxy")
    assert len([x for x in replacements]) == 2

    # verify that container.extensions doesn't include an RpcProxy anymore
    assert all([not isinstance(dependency, RpcProxy)
                for dependency in container.extensions])

    container.start()

    # verify that the mock dependency collects calls
    msg = "msg"
    with ServiceRpcProxy("service", rabbit_config) as service_proxy:
        service_proxy.method(msg)

    foo_proxy.remote_method.assert_called_once_with(msg)
github etalab / croquemort / tests / services / test_http.py View on Github external
def test_checking_one(container_factory, web_session, web_container_config):
    http_container = container_factory(HttpService, web_container_config)
    dispatch = replace_dependencies(http_container, 'dispatch')
    http_container.start()
    rv = web_session.post('/check/one', data=json.dumps({
        'url': 'http://example.org/test_checking_one'
    }))
    assert rv.json()['url-hash'] == 'u:a55f9fb5'
    assert dispatch.call_count == 1
github etalab / croquemort / tests / services / test_webhook.py View on Github external
def test_webhook_multiple_urls(
        web_container_config, container_factory, rmock=None):
    test_url = 'http://example.org'
    test_cb_url = 'http://example.org/cb'
    test_cb_url_2 = 'http://example.org/cb2'
    container = container_factory(WebhookService, web_container_config)
    storage = replace_dependencies(container, 'storage')
    storage.get_webhooks_for_url = lambda url: [test_cb_url, test_cb_url_2]
    container.start()
    dispatch = event_dispatcher(web_container_config)
    rmock.post(test_cb_url, text='xxx')
    rmock.post(test_cb_url_2, text='xxx')
    with entrypoint_waiter(container, 'send_response'):
        dispatch('url_crawler', 'url_crawled', {'checked-url': test_url})
    requests_l = filter_mock_requests(test_cb_url, rmock.request_history)
    assert len(requests_l) == 2
    for request in requests_l:
        assert request.method == 'POST'
        assert request.url == test_cb_url or test_cb_url_2
        assert request.json() == {'data': {'checked-url': test_url}}
github etalab / croquemort / tests / services / test_webhook.py View on Github external
def test_webhook_valid_call(
        web_container_config, container_factory, rmock=None):
    test_url = 'http://example.org'
    test_cb_url = 'http://example.org/cb'
    container = container_factory(WebhookService, web_container_config)
    storage = replace_dependencies(container, 'storage')
    storage.get_webhooks_for_url = lambda url: [test_cb_url]
    container.start()
    dispatch = event_dispatcher(web_container_config)
    rmock.post(test_cb_url, text='xxx')
    with entrypoint_waiter(container, 'send_response'):
        dispatch('url_crawler', 'url_crawled', {'checked-url': test_url})
    requests_l = filter_mock_requests(test_cb_url, rmock.request_history)
    assert len(requests_l) == 1
    request = requests_l[0]
    assert request.method == 'POST'
    assert request.url == test_cb_url
    assert request.json() == {'data': {'checked-url': test_url}}
github nameko / nameko / test / testing / test_services.py View on Github external
def test_replace_non_dependency(container_factory, rabbit_config):

    class Service(object):
        name = "service"
        proxy = RpcProxy("foo_service")

        @rpc
        def method(self):
            pass  # pragma: no cover

    container = container_factory(Service, rabbit_config)

    # error if dependency doesn't exit
    with pytest.raises(ExtensionNotFound):
        replace_dependencies(container, "nonexist")

    # error if dependency is not an dependency
    with pytest.raises(ExtensionNotFound):
        replace_dependencies(container, "method")
github etalab / croquemort / tests / test_integrations.py View on Github external
def test_retrieve_group(runner_factory, web_session):
    runner = runner_factory(HttpService)
    http_container = get_container(runner, HttpService)
    storage = replace_dependencies(http_container, 'storage')
    storage.get_group = lambda group_hash: {
        'url': group_hash,
        'name': 'datagouvfr',
        'url_hash': 'url'
    }
    storage.get_url = lambda url_hash: {'url': url_hash}
    runner.start()
    rv = web_session.get('/group', data=json.dumps({
        'group': 'datagouvfr'
    }))
    result = rv.json()
    assert result['name'] == 'datagouvfr'
    assert 'url_hash' in result['urls'][0].values()
github etalab / croquemort / tests / services / test_webhook.py View on Github external
def test_webhook_retry(
        web_container_config, container_factory, rmock=None):
    test_url = 'http://example.org'
    test_cb_url = 'http://example.org/cb'
    web_container_config['WEBHOOK_DELAY_INTERVAL'] = 1
    web_container_config['WEBHOOK_BACKOFF_FACTOR'] = 1
    container = container_factory(WebhookService, web_container_config)
    storage = replace_dependencies(container, 'storage')
    storage.get_webhooks_for_url = lambda url: [test_cb_url]
    container.start()
    dispatch = event_dispatcher(web_container_config)
    # 1 failed response and then a valid one
    rmock.post(test_cb_url, [{'status_code': 404}, {'status_code': 200}])
    with entrypoint_waiter(container, 'send_response'):
        dispatch('url_crawler', 'url_crawled', {'checked-url': test_url})
    requests_l = filter_mock_requests(test_cb_url, rmock.request_history)
    assert len(requests_l) == 2
    request = requests_l[-1]
    assert request.method == 'POST'
    assert request.url == test_cb_url
    assert request.json() == {'data': {'checked-url': test_url}}
github etalab / croquemort / tests / test_integrations.py View on Github external
def test_cache_report(runner_factory, web_session):
    runner = runner_factory(HttpService)
    http_container = get_container(runner, HttpService)
    storage = replace_dependencies(http_container, 'storage')
    storage.set_cache.return_value = None
    storage.get_cache.return_value = None
    storage.expire_cache.return_value = None
    runner.start()

    rv = web_session.get('/')
    cache_duration = 60 * 60 * 2  # Equals 2 hours.
    default_key = 'cache-display_report-'
    assert rv.text.startswith('')
    storage.get_cache.assert_called_once_with(default_key)
    storage.set_cache.assert_called_once_with(default_key, ANY)  # HTML.
    storage.expire_cache.assert_called_once_with(default_key, cache_duration)
github nameko / nameko / test / testing / test_services.py View on Github external
def __init__(self):
            self.processed = []

        def remote_method(self, arg):
            self.processed.append(arg)

    container = container_factory(Service, rabbit_config)

    # customise a single dependency
    fake_foo_proxy = FakeDependency()
    replace_dependencies(container, foo_proxy=fake_foo_proxy)
    assert 2 == len([dependency for dependency in container.extensions
                     if isinstance(dependency, RpcProxy)])

    # customise multiple dependencies
    res = replace_dependencies(container, bar_proxy=Mock(), baz_proxy=Mock())
    assert list(res) == []

    # verify that container.extensions doesn't include an RpcProxy anymore
    assert all([not isinstance(dependency, RpcProxy)
                for dependency in container.extensions])

    container.start()

    # verify that the fake dependency collected calls
    msg = "msg"
    with ServiceRpcProxy("service", rabbit_config) as service_proxy:
        service_proxy.method(msg)

    assert fake_foo_proxy.processed == [msg]
github nameko / nameko / test / testing / test_services.py View on Github external
def method(self, arg):
            self.foo_proxy.remote_method(arg)
            self.bar_proxy.bar()
            self.baz_proxy.baz()

    class FakeDependency(object):
        def __init__(self):
            self.processed = []

        def remote_method(self, arg):
            self.processed.append(arg)

    container = container_factory(Service, rabbit_config)

    fake_foo_proxy = FakeDependency()
    mock_bar_proxy, mock_baz_proxy = replace_dependencies(
        container, 'bar_proxy', 'baz_proxy', foo_proxy=fake_foo_proxy
    )

    # verify that container.extensions doesn't include an RpcProxy anymore
    assert all([not isinstance(dependency, RpcProxy)
                for dependency in container.extensions])

    container.start()

    # verify that the fake dependency collected calls
    msg = "msg"
    with ServiceRpcProxy("service", rabbit_config) as service_proxy:
        service_proxy.method(msg)

    assert fake_foo_proxy.processed == [msg]
    assert mock_bar_proxy.bar.call_count == 1