Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
name = "service"
foo_proxy = RpcProxy("foo_service")
bar_proxy = RpcProxy("bar_service")
baz_proxy = RpcProxy("baz_service")
@rpc
def method(self, arg):
self.foo_proxy.remote_method(arg)
container = container_factory(Service, rabbit_config)
# replace a single dependency
foo_proxy = replace_dependencies(container, "foo_proxy")
# replace multiple dependencies
replacements = replace_dependencies(container, "bar_proxy", "baz_proxy")
assert len([x for x in replacements]) == 2
# verify that container.extensions doesn't include an RpcProxy anymore
assert all([not isinstance(dependency, RpcProxy)
for dependency in container.extensions])
container.start()
# verify that the mock dependency collects calls
msg = "msg"
with ServiceRpcProxy("service", rabbit_config) as service_proxy:
service_proxy.method(msg)
foo_proxy.remote_method.assert_called_once_with(msg)
def test_checking_one(container_factory, web_session, web_container_config):
http_container = container_factory(HttpService, web_container_config)
dispatch = replace_dependencies(http_container, 'dispatch')
http_container.start()
rv = web_session.post('/check/one', data=json.dumps({
'url': 'http://example.org/test_checking_one'
}))
assert rv.json()['url-hash'] == 'u:a55f9fb5'
assert dispatch.call_count == 1
def test_webhook_multiple_urls(
web_container_config, container_factory, rmock=None):
test_url = 'http://example.org'
test_cb_url = 'http://example.org/cb'
test_cb_url_2 = 'http://example.org/cb2'
container = container_factory(WebhookService, web_container_config)
storage = replace_dependencies(container, 'storage')
storage.get_webhooks_for_url = lambda url: [test_cb_url, test_cb_url_2]
container.start()
dispatch = event_dispatcher(web_container_config)
rmock.post(test_cb_url, text='xxx')
rmock.post(test_cb_url_2, text='xxx')
with entrypoint_waiter(container, 'send_response'):
dispatch('url_crawler', 'url_crawled', {'checked-url': test_url})
requests_l = filter_mock_requests(test_cb_url, rmock.request_history)
assert len(requests_l) == 2
for request in requests_l:
assert request.method == 'POST'
assert request.url == test_cb_url or test_cb_url_2
assert request.json() == {'data': {'checked-url': test_url}}
def test_webhook_valid_call(
web_container_config, container_factory, rmock=None):
test_url = 'http://example.org'
test_cb_url = 'http://example.org/cb'
container = container_factory(WebhookService, web_container_config)
storage = replace_dependencies(container, 'storage')
storage.get_webhooks_for_url = lambda url: [test_cb_url]
container.start()
dispatch = event_dispatcher(web_container_config)
rmock.post(test_cb_url, text='xxx')
with entrypoint_waiter(container, 'send_response'):
dispatch('url_crawler', 'url_crawled', {'checked-url': test_url})
requests_l = filter_mock_requests(test_cb_url, rmock.request_history)
assert len(requests_l) == 1
request = requests_l[0]
assert request.method == 'POST'
assert request.url == test_cb_url
assert request.json() == {'data': {'checked-url': test_url}}
def test_replace_non_dependency(container_factory, rabbit_config):
class Service(object):
name = "service"
proxy = RpcProxy("foo_service")
@rpc
def method(self):
pass # pragma: no cover
container = container_factory(Service, rabbit_config)
# error if dependency doesn't exit
with pytest.raises(ExtensionNotFound):
replace_dependencies(container, "nonexist")
# error if dependency is not an dependency
with pytest.raises(ExtensionNotFound):
replace_dependencies(container, "method")
def test_retrieve_group(runner_factory, web_session):
runner = runner_factory(HttpService)
http_container = get_container(runner, HttpService)
storage = replace_dependencies(http_container, 'storage')
storage.get_group = lambda group_hash: {
'url': group_hash,
'name': 'datagouvfr',
'url_hash': 'url'
}
storage.get_url = lambda url_hash: {'url': url_hash}
runner.start()
rv = web_session.get('/group', data=json.dumps({
'group': 'datagouvfr'
}))
result = rv.json()
assert result['name'] == 'datagouvfr'
assert 'url_hash' in result['urls'][0].values()
def test_webhook_retry(
web_container_config, container_factory, rmock=None):
test_url = 'http://example.org'
test_cb_url = 'http://example.org/cb'
web_container_config['WEBHOOK_DELAY_INTERVAL'] = 1
web_container_config['WEBHOOK_BACKOFF_FACTOR'] = 1
container = container_factory(WebhookService, web_container_config)
storage = replace_dependencies(container, 'storage')
storage.get_webhooks_for_url = lambda url: [test_cb_url]
container.start()
dispatch = event_dispatcher(web_container_config)
# 1 failed response and then a valid one
rmock.post(test_cb_url, [{'status_code': 404}, {'status_code': 200}])
with entrypoint_waiter(container, 'send_response'):
dispatch('url_crawler', 'url_crawled', {'checked-url': test_url})
requests_l = filter_mock_requests(test_cb_url, rmock.request_history)
assert len(requests_l) == 2
request = requests_l[-1]
assert request.method == 'POST'
assert request.url == test_cb_url
assert request.json() == {'data': {'checked-url': test_url}}
def test_cache_report(runner_factory, web_session):
runner = runner_factory(HttpService)
http_container = get_container(runner, HttpService)
storage = replace_dependencies(http_container, 'storage')
storage.set_cache.return_value = None
storage.get_cache.return_value = None
storage.expire_cache.return_value = None
runner.start()
rv = web_session.get('/')
cache_duration = 60 * 60 * 2 # Equals 2 hours.
default_key = 'cache-display_report-'
assert rv.text.startswith('')
storage.get_cache.assert_called_once_with(default_key)
storage.set_cache.assert_called_once_with(default_key, ANY) # HTML.
storage.expire_cache.assert_called_once_with(default_key, cache_duration)
def __init__(self):
self.processed = []
def remote_method(self, arg):
self.processed.append(arg)
container = container_factory(Service, rabbit_config)
# customise a single dependency
fake_foo_proxy = FakeDependency()
replace_dependencies(container, foo_proxy=fake_foo_proxy)
assert 2 == len([dependency for dependency in container.extensions
if isinstance(dependency, RpcProxy)])
# customise multiple dependencies
res = replace_dependencies(container, bar_proxy=Mock(), baz_proxy=Mock())
assert list(res) == []
# verify that container.extensions doesn't include an RpcProxy anymore
assert all([not isinstance(dependency, RpcProxy)
for dependency in container.extensions])
container.start()
# verify that the fake dependency collected calls
msg = "msg"
with ServiceRpcProxy("service", rabbit_config) as service_proxy:
service_proxy.method(msg)
assert fake_foo_proxy.processed == [msg]
def method(self, arg):
self.foo_proxy.remote_method(arg)
self.bar_proxy.bar()
self.baz_proxy.baz()
class FakeDependency(object):
def __init__(self):
self.processed = []
def remote_method(self, arg):
self.processed.append(arg)
container = container_factory(Service, rabbit_config)
fake_foo_proxy = FakeDependency()
mock_bar_proxy, mock_baz_proxy = replace_dependencies(
container, 'bar_proxy', 'baz_proxy', foo_proxy=fake_foo_proxy
)
# verify that container.extensions doesn't include an RpcProxy anymore
assert all([not isinstance(dependency, RpcProxy)
for dependency in container.extensions])
container.start()
# verify that the fake dependency collected calls
msg = "msg"
with ServiceRpcProxy("service", rabbit_config) as service_proxy:
service_proxy.method(msg)
assert fake_foo_proxy.processed == [msg]
assert mock_bar_proxy.bar.call_count == 1