Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_task_runner_puts_checkpointing_in_context(self):
with prefect.context() as ctx:
assert "checkpointing" not in ctx
with set_temporary_config({"flows.checkpointing": "FOO"}):
result = TaskRunner(Task()).initialize_run(state=None, context=ctx)
assert result.context.checkpointing == "FOO"
def test_cloud_handler_creates_client_after_first_method_call(self, monkeypatch):
client = MagicMock(post=requests_post)
monkeypatch.setattr(
"prefect.engine.cloud.result_handler.Client", MagicMock(return_value=client)
)
with set_temporary_config({"cloud.result_handler": "http://foo.bar:4204"}):
handler = CloudResultHandler()
handler.write("random string")
assert handler._client == client
)
)
session = MagicMock()
session.return_value.post = post
monkeypatch.setattr("requests.Session", session)
mock_client = MagicMock()
mock_client.create_flow_run.return_value = "id"
monkeypatch.setattr("prefect.cli.run.Client", MagicMock(return_value=mock_client))
with tempfile.TemporaryDirectory() as directory:
file_path = os.path.join(directory, "file.json")
with open(file_path, "w") as tmp:
json.dump({"test": 42}, tmp)
with set_temporary_config(
{"cloud.graphql": "http://my-cloud.foo", "cloud.auth_token": "secret_token"}
):
runner = CliRunner()
result = runner.invoke(
run,
[
"cloud",
"--name",
"flow",
"--project",
"project",
"--version",
"2",
"--parameters",
file_path,
],
def multiprocessing_helper(executor):
set_task_run_state = MagicMock(
side_effect=lambda task_run_id, version, state, cache_for: state
)
client = MagicMock(set_task_run_state=set_task_run_state)
monkeypatch.setattr(
"prefect.engine.cloud.task_runner.Client",
MagicMock(return_value=client),
)
runner = CloudTaskRunner(task=sleeper)
runner._heartbeat = update
with set_temporary_config({"cloud.heartbeat_interval": 0.025}):
return runner.run(executor=executor)
def test_populate_custom_worker_spec_yaml():
environment = DaskKubernetesEnvironment()
file_path = os.path.dirname(prefect.environments.execution.dask.k8s.__file__)
with open(path.join(file_path, "worker_pod.yaml")) as pod_file:
pod = yaml.safe_load(pod_file)
pod["spec"]["containers"][0]["env"] = []
with set_temporary_config(
{"cloud.graphql": "gql_test", "cloud.auth_token": "auth_test"}
):
with prefect.context(flow_run_id="id_test", image="my_image"):
yaml_obj = environment._populate_worker_spec_yaml(yaml_obj=pod)
assert yaml_obj["metadata"]["labels"]["identifier"] == environment.identifier_label
assert yaml_obj["metadata"]["labels"]["flow_run_id"] == "id_test"
env = yaml_obj["spec"]["containers"][0]["env"]
assert env[0]["value"] == "gql_test"
assert env[1]["value"] == "auth_test"
assert env[2]["value"] == "id_test"
assert env[3]["value"] == "false"
assert env[4]["value"] == "prefect.engine.cloud.CloudFlowRunner"
assert env[5]["value"] == "prefect.engine.cloud.CloudTaskRunner"
def test_s3_writes_to_blob_prefixed_by_date_suffixed_by_prefect(self, s3_client):
handler = S3ResultHandler(bucket="foo")
with prefect.context(
secrets=dict(AWS_CREDENTIALS=dict(ACCESS_KEY=1, SECRET_ACCESS_KEY=42))
):
with set_temporary_config({"cloud.use_local_secrets": True}):
uri = handler.write("so-much-data")
used_uri = s3_client.return_value.upload_fileobj.call_args[1]["Key"]
assert used_uri == uri
assert used_uri.startswith(pendulum.now("utc").format("Y/M/D"))
assert used_uri.endswith("prefect_result")
def test_set_task_run_state_with_error(patch_post):
response = {
"data": {"setTaskRunStates": None},
"errors": [{"message": "something went wrong"}],
}
post = patch_post(response)
with set_temporary_config(
{"cloud.graphql": "http://my-cloud.foo", "cloud.auth_token": "secret_token"}
):
client = Client()
with pytest.raises(ClientError, match="something went wrong"):
client.set_task_run_state(task_run_id="76-salt", version=0, state=Pending())
def test_override_functions_on_commands():
with set_temporary_config({"cloud.auth_token": "TOKEN"}):
runner = CliRunner()
result = runner.invoke(auth, ["revoke-token", "--id", "id"])
assert result.exit_code == 1
def test_describe_flow_runs_populated(monkeypatch):
post = MagicMock(
return_value=MagicMock(
json=MagicMock(
return_value=dict(data=dict(flow_run=[{"name": "flow-run"}]))
)
)
)
monkeypatch.setattr("requests.post", post)
with set_temporary_config(
{"cloud.graphql": "http://my-cloud.foo", "cloud.auth_token": "secret_token"}
):
runner = CliRunner()
result = runner.invoke(
describe, ["flow-runs", "--name", "flow-run", "--flow-name", "flow"]
)
assert result.exit_code == 0
query = """
query {
flow_run(where: { _and: { name: { _eq: "flow-run" }, flow: { name: { _eq: "flow" } } } }) {
name
flow {
name
}
created
def test_populate_custom_scheduler_spec_yaml(log_flag):
environment = DaskKubernetesEnvironment()
file_path = os.path.dirname(prefect.environments.execution.dask.k8s.__file__)
with open(path.join(file_path, "job.yaml")) as job_file:
job = yaml.safe_load(job_file)
job["spec"]["template"]["spec"]["containers"][0]["env"] = []
with set_temporary_config(
{
"cloud.graphql": "gql_test",
"cloud.auth_token": "auth_test",
"logging.log_to_cloud": log_flag,
}
):
with prefect.context(flow_run_id="id_test", namespace="namespace_test"):
yaml_obj = environment._populate_scheduler_spec_yaml(
yaml_obj=job, docker_name="test1/test2:test3", flow_file_path="test4"
)
assert yaml_obj["metadata"]["name"] == "prefect-dask-job-{}".format(
environment.identifier_label
)
env = yaml_obj["spec"]["template"]["spec"]["containers"][0]["env"]