Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_pull_image(capsys, monkeypatch):
storage = Docker(base_image="python:3.6")
client = MagicMock()
client.pull.return_value = [{"progress": "test", "status": "100"}]
monkeypatch.setattr("docker.APIClient", MagicMock(return_value=client))
storage.pull_image()
captured = capsys.readouterr()
printed_lines = [line for line in captured.out.split("\n") if line != ""]
assert any(["100 test\r" in line for line in printed_lines])
def test_create_dockerfile_from_dockerfile_uses_tempdir_path():
myfile = "FROM my-own-image:latest\n\nRUN echo 'hi'"
with tempfile.TemporaryDirectory() as tempdir_outside:
with open(os.path.join(tempdir_outside, "test"), "w+") as t:
t.write("asdf")
with tempfile.TemporaryDirectory() as directory:
with open(os.path.join(directory, "myfile"), "w") as tmp:
tmp.write(myfile)
storage = Docker(
dockerfile=os.path.join(directory, "myfile"),
files={os.path.join(tempdir_outside, "test"): "./test2"},
)
storage.add_flow(Flow("foo"))
dpath = storage.create_dockerfile_object(directory=directory)
with open(dpath, "r") as dockerfile:
output = dockerfile.read()
assert (
"COPY {} /root/.prefect/flows/foo.prefect".format(
os.path.join(directory, "foo.flow")
)
in output
), output
assert (
def test_docker_storage_allows_for_user_provided_config_locations():
storage = Docker(env_vars={"PREFECT__USER_CONFIG_PATH": "1"},)
assert storage.env_vars == {
"PREFECT__USER_CONFIG_PATH": "1",
}
def test_serialize_docker_storage():
storage = Docker()
serialized_storage = storage.serialize()
assert serialized_storage["type"] == "Docker"
def test_build_sets_informative_image_name(monkeypatch):
storage = Docker(registry_url="reg")
storage.add_flow(Flow("test"))
monkeypatch.setattr("prefect.environments.storage.Docker._build_image", MagicMock())
output = storage.build()
assert output.registry_url == storage.registry_url
assert output.image_name == "test"
assert output.image_tag.startswith(str(pendulum.now("utc").year))
"propagateTags": "test",
}
agent = FargateAgent(
aws_access_key_id="id",
aws_secret_access_key="secret",
aws_session_token="token",
region_name="region",
**kwarg_dict
)
agent.deploy_flow(
flow_run=GraphQLResult(
{
"flow": GraphQLResult(
{
"storage": Docker(
registry_url="test", image_name="name", image_tag="tag"
).serialize(),
"id": "id",
}
),
"id": "id",
}
)
)
assert boto3_client.describe_task_definition.called
assert boto3_client.run_task.called
assert boto3_client.run_task.call_args[1]["cluster"] == "cluster"
assert boto3_client.run_task.call_args[1]["taskDefinition"] == "prefect-task-id"
assert boto3_client.run_task.call_args[1]["launchType"] == "FARGATE"
assert boto3_client.run_task.call_args[1]["overrides"] == {
def test_deploy_flow_register_task_definition(monkeypatch, runner_token):
boto3_client = MagicMock()
boto3_client.describe_task_definition.side_effect = ClientError({}, None)
boto3_client.run_task.return_value = {"tasks": [{"taskArn": "test"}]}
boto3_client.register_task_definition.return_value = {}
monkeypatch.setattr("boto3.client", MagicMock(return_value=boto3_client))
agent = FargateAgent()
agent.deploy_flow(
flow_run=GraphQLResult(
{
"flow": GraphQLResult(
{
"storage": Docker(
registry_url="test", image_name="name", image_tag="tag"
).serialize(),
"id": "id",
}
),
"id": "id",
}
)
)
assert boto3_client.describe_task_definition.called
assert boto3_client.register_task_definition.called
assert (
boto3_client.register_task_definition.call_args[1]["family"]
== "prefect-task-id"
)
def test_docker_agent_deploy_flow(monkeypatch, runner_token):
api = MagicMock()
api.ping.return_value = True
api.create_container.return_value = {"Id": "container_id"}
monkeypatch.setattr(
"prefect.agent.docker.agent.docker.APIClient", MagicMock(return_value=api)
)
agent = DockerAgent()
agent.deploy_flow(
flow_run=GraphQLResult(
{
"flow": GraphQLResult(
{
"storage": Docker(
registry_url="test", image_name="name", image_tag="tag"
).serialize()
}
),
"id": "id",
"name": "name",
}
)
)
assert api.pull.called
assert api.create_container.called
assert api.start.called
assert api.create_container.call_args[1]["command"] == "prefect execute cloud-flow"
assert api.start.call_args[1]["container"] == "container_id"
def test_environment_execute():
with tempfile.TemporaryDirectory() as directory:
@prefect.task
def add_to_dict():
with open(path.join(directory, "output"), "w") as tmp:
tmp.write("success")
with open(path.join(directory, "flow_env.prefect"), "w+") as env:
flow = prefect.Flow("test", tasks=[add_to_dict])
flow_path = path.join(directory, "flow_env.prefect")
with open(flow_path, "wb") as f:
cloudpickle.dump(flow, f)
environment = RemoteEnvironment()
storage = Docker(registry_url="test")
environment.execute(storage, flow_path)
with open(path.join(directory, "output"), "r") as file:
assert file.read() == "success"
def execute( # type: ignore
self, storage: "Docker", flow_location: str, **kwargs: Any
) -> None:
"""
Create a single Kubernetes job that runs the flow.
Args:
- storage (Docker): the Docker storage object that contains information relating
to the image which houses the flow
- flow_location (str): the location of the Flow to execute
- **kwargs (Any): additional keyword arguments to pass to the runner
Raises:
- TypeError: if the storage is not `Docker`
"""
if not isinstance(storage, Docker):
raise TypeError("CloudEnvironment requires a Docker storage option")
self.create_flow_run_job(docker_name=storage.name, flow_file_path=flow_location)