Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_expecting_microservice_is_there_when_it_should_not():
step = {
"layer": "fixtures.failing_probe_backend",
"name": "microservice-is-not-available"
}
with pytest.raises(InvalidProbe) as excinfo:
microservice_is_not_available(step, layers[step["layer"]])
step = {
"layer": "fixtures.failing_probe_backend",
"name": "microservice-is-not-available",
"parameters": {
"name": "cherrypy-webapp"
}
}
with pytest.raises(FailedProbe) as excinfo:
microservice_is_not_available(step, layers[step["layer"]])
def test_expecting_a_healthy_microservice_should_be_reported_when_not():
step = {
"layer": "fixtures.failing_probe_backend",
"name": "microservice-available-and-healthy"
}
with pytest.raises(InvalidProbe) as excinfo:
microservice_available_and_healthy(step, layers[step["layer"]])
step = {
"layer": "fixtures.failing_probe_backend",
"name": "microservice-available-and-healthy",
"parameters": {
"name": "cherrypy-webapp"
}
}
with pytest.raises(FailedProbe) as excinfo:
microservice_available_and_healthy(step, layers[step["layer"]])
assert "microservice 'cherrypy-webapp' is not healthy" in str(excinfo)
def microservice_is_not_available(probe: Probe, layer: Layer):
"""
Query the system for one microservice's availability. If the microservice
is found, even in a non-running state, then raises :exc:`FailedProbe`.
"""
name = probe.get("parameters", {}).get("name")
if not name:
raise InvalidProbe("missing microservice name")
unavailable = layer.microservice_is_not_available(name)
if unavailable is False:
raise FailedProbe(
"microservice '{name}' looks healthy".format(name=name))
def microservice_available_and_healthy(probe: Probe, layer: Layer):
"""
Query the system for one microservice's availability and health. If the
microservice is not available or running, raises :exc:`FailedProbe`.
"""
name = probe.get("parameters", {}).get("name")
if not name:
raise InvalidProbe("missing microservice name")
available = layer.microservice_available_and_healthy(name)
if available is None:
raise FailedProbe(
"microservice '{name}' was not found".format(name=name))
if available is False:
raise FailedProbe(
"microservice '{name}' is not healthy".format(name=name))
def endpoint_should_respond_ok(probe: Probe, layer: Layer):
url = probe.get("parameters", {}).get("url")
if not url:
raise InvalidProbe("missing endpoint url")
if not layer.endpoint_should_respond_ok(url):
raise FailedProbe("endpoint did not return an okay status")