Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_instantiating(app):
dockerflow = Dockerflow()
assert "dockerflow.heartbeat" not in app.router.routes_names
dockerflow.init_app(app)
assert "dockerflow.heartbeat" in app.router.routes_names
def test_version_path(app, mocker, test_client, version_content):
custom_version_path = "/something/extra/ordinary"
dockerflow = Dockerflow(app, version_path=custom_version_path)
version_callback = mocker.patch.object(
dockerflow, "_version_callback", return_value=version_content
)
_, response = test_client.get("/__version__")
assert response.status == 200
assert response.json == version_content
version_callback.assert_called_with(custom_version_path)
def dockerflow(app):
return Dockerflow(app)
def dockerflow_redis(app):
app.config["REDIS"] = {"address": "redis://:password@localhost:6379/0"}
return Dockerflow(app, redis=SanicRedis(app))
def init_app(app: Sanic, q: SQLiteAckQueue) -> Dockerflow:
"""Initialize Sanic app with dockerflow apis."""
dockerflow = Dockerflow()
dockerflow.check(name="check_disk_bytes_free")(
partial(check_disk_bytes_free, app=app, q=q)
)
dockerflow.check(name="check_queue_size")(partial(check_queue_size, q=q))
dockerflow.init_app(app)