Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def looper():
if (
prefect.context.get("task_loop_count") == 2
and prefect.context.get("task_run_count", 1) == 1
):
raise ValueError("Stop")
if prefect.context.get("task_loop_count", 1) < 3:
raise LOOP(result=prefect.context.get("task_loop_result", 0) + 10)
return prefect.context.get("task_loop_result")
def looper():
if (
prefect.context.get("task_loop_count") == 2
and prefect.context.get("task_run_count", 1) == 1
):
raise ValueError("Stop")
if prefect.context.get("task_loop_count", 1) < 3:
raise LOOP(result=prefect.context.get("task_loop_result", 0) + 10)
return prefect.context.get("task_loop_result")
def _prep_dask_kwargs(self) -> dict:
dask_kwargs = {"pure": False} # type: dict
## set a key for the dask scheduler UI
if context.get("task_full_name"):
key = context.get("task_full_name", "") + "-" + str(uuid.uuid4())
dask_kwargs.update(key=key)
## infer from context if dask resources are being utilized
dask_resource_tags = [
tag
for tag in context.get("task_tags", [])
if tag.lower().startswith("dask-resource")
]
if dask_resource_tags:
resources = {}
for tag in dask_resource_tags:
prefix, val = tag.split("=")
resources.update({prefix.split(":")[1]: float(val)})
dask_kwargs.update(resources=resources)
def emit(self, record) -> None: # type: ignore
# if we shouldn't log to cloud, don't emit
if not prefect.context.config.logging.log_to_cloud:
return
try:
from prefect.client import Client
if self.client is None:
self.client = Client() # type: ignore
assert isinstance(self.client, Client) # mypy assert
record_dict = record.__dict__.copy()
log = dict()
log["flowRunId"] = prefect.context.get("flow_run_id", None)
log["taskRunId"] = prefect.context.get("task_run_id", None)
log["timestamp"] = pendulum.from_timestamp(
record_dict.pop("created", time.time())
).isoformat()
log["name"] = record_dict.pop("name", None)
log["message"] = record_dict.pop("message", None)
log["level"] = record_dict.pop("levelname", None)
if record_dict.get("exc_text") is not None:
log["message"] += "\n" + record_dict.pop("exc_text", "")
record_dict.pop("exc_info", None)
log["info"] = record_dict
self.put(log)
except Exception as exc:
self.logger.critical("Failed to write log with error: {}".format(str(exc)))
- inputs (Dict[str, Result], optional): a dictionary of inputs whose keys correspond
to the task's `run()` arguments.
Returns:
- State: the state of the task after running the check
"""
if state.is_failed():
run_count = prefect.context.get("task_run_count", 1)
if prefect.context.get("task_loop_count") is not None:
loop_context = {
"_loop_count": Result(
value=prefect.context["task_loop_count"],
result_handler=JSONResultHandler(),
),
"_loop_result": Result(
value=prefect.context.get("task_loop_result"),
result_handler=self.result_handler,
),
}
inputs.update(loop_context)
if run_count <= self.task.max_retries:
start_time = pendulum.now("utc") + self.task.retry_delay
msg = "Retrying Task (after attempt {n} of {m})".format(
n=run_count, m=self.task.max_retries + 1
)
retry_state = Retrying(
start_time=start_time,
cached_inputs=inputs,
message=msg,
run_count=run_count,
)
return retry_state
- yaml_obj (dict): A dictionary representing the parsed yaml
Returns:
- dict: a dictionary with the yaml values replaced
"""
# set identifier labels
yaml_obj["metadata"]["labels"]["identifier"] = self.identifier_label
# set environment variables
env = yaml_obj["spec"]["containers"][0]["env"]
env[0]["value"] = prefect.config.cloud.graphql
env[1]["value"] = prefect.config.cloud.log
env[2]["value"] = prefect.config.cloud.result_handler
env[3]["value"] = prefect.config.cloud.auth_token
env[4]["value"] = prefect.context.get("flow_run_id", "")
# set image
yaml_obj["spec"]["containers"][0]["image"] = prefect.context.get(
"image", "daskdev/dask:latest"
)
return yaml_obj
def _populate_worker_pod_yaml(self, yaml_obj: dict) -> dict:
"""
Populate the worker pod yaml object used in this environment with the proper values.
Args:
- yaml_obj (dict): A dictionary representing the parsed yaml
Returns:
- dict: a dictionary with the yaml values replaced
"""
# set identifier labels
yaml_obj["metadata"]["labels"]["identifier"] = self.identifier_label
yaml_obj["metadata"]["labels"]["flow_run_id"] = prefect.context.get(
"flow_run_id", "unknown"
)
# set environment variables
env = yaml_obj["spec"]["containers"][0]["env"]
env[0]["value"] = prefect.config.cloud.graphql
env[1]["value"] = prefect.config.cloud.auth_token
env[2]["value"] = prefect.context.get("flow_run_id", "")
if self.private_registry:
namespace = prefect.context.get("namespace", "default")
pod_spec = yaml_obj["spec"]
pod_spec["imagePullSecrets"] = []
pod_spec["imagePullSecrets"].append({"name": namespace + "-docker"})
Args:
- state (State): the current state of this task
- inputs (Dict[str, Result]): a dictionary of inputs whose keys correspond
to the task's `run()` arguments.
Returns:
- State: the state of the task after running the check
Raises:
- ENDRUN: if the task is not ready to run
"""
if state.is_cached():
assert isinstance(state, Cached) # mypy assert
sanitized_inputs = {key: res.value for key, res in inputs.items()}
if self.task.cache_validator(
state, sanitized_inputs, prefect.context.get("parameters")
):
state._result = state._result.to_result(self.task.result_handler)
return state
else:
state = Pending("Cache was invalid; ready to run.")
if self.task.cache_for is not None:
candidate_states = prefect.context.caches.get(
self.task.cache_key or self.task.name, []
)
sanitized_inputs = {key: res.value for key, res in inputs.items()}
for candidate in candidate_states:
if self.task.cache_validator(
candidate, sanitized_inputs, prefect.context.get("parameters")
):
candidate._result = candidate._result.to_result(
Example:
```python
@task
def add(x, y):
return x + y
with Flow() as f:
with tags("math", "function"):
result = add(1, 5)
print(result.tags) # {"function", "math"}
```
"""
tags_set = set(tags)
tags_set.update(prefect.context.get("tags", set()))
with prefect.context(tags=tags_set):
yield
signal=type(exc).__name__,
)
)
if prefect.context.get("raise_on_exception"):
raise exc
raise ENDRUN(exc.state)
# Exceptions are trapped and turned into TriggerFailed states
except Exception as exc:
self.logger.exception(
"Task '{name}': unexpected error while evaluating task trigger: {exc}".format(
exc=repr(exc),
name=prefect.context.get("task_full_name", self.task.name),
)
)
if prefect.context.get("raise_on_exception"):
raise exc
raise ENDRUN(
TriggerFailed(
"Unexpected error while checking task trigger: {}".format(
repr(exc)
),
result=exc,
)
)
return state