Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self, *args: Any, **kwargs: Any):
super().__init__(*args, **kwargs)
def run(self, x: Any, y: Any) -> Any: # type: ignore
"""
Args:
- x (Any): a value
- y (Any): a value
Returns:
- Any
"""
return x // y
class Pow(Task):
"""
Evaluates `x ** y`
Args:
- *args (Any): positional arguments for the `Task` class
- **kwargs (Any): keyword arguments for the `Task` class
"""
def __init__(self, *args: Any, **kwargs: Any):
super().__init__(*args, **kwargs)
def run(self, x: Any, y: Any) -> Any: # type: ignore
"""
Args:
- x (Any): a value
- y (Any): a value
def test_formatter_formats_states(state):
orig = slack_message_formatter(Task(), state())
assert json.loads(json.dumps(orig)) == orig
because they won't pass their upstream checks
"""
@prefect.task(max_retries=1, retry_delay=datetime.timedelta(minutes=2))
def error():
1 / 0
flow_run_id = str(uuid.uuid4())
task_run_id_1 = str(uuid.uuid4())
task_run_id_2 = str(uuid.uuid4())
task_run_id_3 = str(uuid.uuid4())
with prefect.Flow(name="test") as flow:
t1 = error()
t2 = prefect.Task()
t3 = prefect.Task()
t2.set_upstream(t1)
t3.set_upstream(t2)
client = MockedCloudClient(
flow_runs=[FlowRun(id=flow_run_id)],
task_runs=[
TaskRun(id=task_run_id_1, task_slug=t1.slug, flow_run_id=flow_run_id),
TaskRun(id=task_run_id_2, task_slug=t2.slug, flow_run_id=flow_run_id),
TaskRun(id=task_run_id_3, task_slug=t3.slug, flow_run_id=flow_run_id),
],
monkeypatch=monkeypatch,
)
with prefect.context(flow_run_id=flow_run_id):
state = CloudFlowRunner(flow=flow).run(
return_tasks=flow.tasks, executor=executor
def test_client_is_always_called_even_during_state_handler_failures(client):
def handler(task, old, new):
1 / 0
flow = prefect.Flow(tasks=[prefect.Task()], state_handlers=[handler])
## flow run setup
res = flow.run(state=Pending())
## assertions
assert client.get_flow_run_info.call_count == 1 # one time to pull latest state
assert client.set_flow_run_state.call_count == 1 # Failed
flow_states = [
call[1]["state"] for call in client.set_flow_run_state.call_args_list
]
state = flow_states.pop()
assert state.is_failed()
assert "state handlers" in state.message
assert isinstance(state.result, ZeroDivisionError)
assert client.get_task_run_info.call_count == 0
def test_simple_two_task_flow_with_final_task_set_to_fail(monkeypatch, executor):
flow_run_id = str(uuid.uuid4())
task_run_id_1 = str(uuid.uuid4())
task_run_id_2 = str(uuid.uuid4())
with prefect.Flow(name="test") as flow:
t1 = prefect.Task()
t2 = prefect.Task()
t2.set_upstream(t1)
client = MockedCloudClient(
flow_runs=[FlowRun(id=flow_run_id)],
task_runs=[
TaskRun(id=task_run_id_1, task_slug=t1.slug, flow_run_id=flow_run_id),
TaskRun(
id=task_run_id_2,
task_slug=t2.slug,
flow_run_id=flow_run_id,
state=Failed(),
),
],
monkeypatch=monkeypatch,
)
configuration.api_key["authorization"] = kubernetes_api_key
api_client = client.CoreV1Api(client.ApiClient(configuration))
else:
try:
config.load_incluster_config()
except config.config_exception.ConfigException:
config.load_kube_config()
api_client = client.CoreV1Api()
kube_kwargs = {**self.kube_kwargs, **(kube_kwargs or {})}
return api_client.list_namespaced_service(namespace=namespace, **kube_kwargs)
class PatchNamespacedService(Task):
"""
Task for patching a namespaced service on Kubernetes.
Note that all initialization arguments can optionally be provided or overwritten at runtime.
This task will attempt to connect to a Kubernetes cluster in three steps with
the first successful connection attempt becoming the mode of communication with a
cluster.
1. Attempt to use a Prefect Secret that contains a Kubernetes API Key
2. Attempt in-cluster connection (will only work when running on a Service in a cluster)
3. Attempt out-of-cluster connection using the default location for a kube config file
The arguments `body` and `kube_kwargs` will perform an in-place update when the task
is run. This means that it is possible to provide `body = {"info": "here"}` at
instantiation and then provide `body = {"more": "info"}` at run time which will make
`body = {"info": "here", "more": "info"}`. *Note*: Keys present in both instantiation
connection = redis.Redis(
host=self.host,
port=self.port,
db=self.db,
password=password,
**self.redis_connection_params
)
result = connection.set(
name=redis_key, value=redis_val, ex=ex, px=px, nx=nx, xx=xx
)
return result
class RedisGet(Task):
"""
Task for getting a value based on key from a Redis connection.
Args:
- host (str, optional): name of Redis host, defaults to 'localhost'
- port (int, optional): Redis port, defaults to 6379
- db (int, optional): redis database index, defaults to 0
- password_secret (str, optional): the name of the Prefect Secret
which stores your Redis password
- redis_connection_params (dict, optional): key-value pairs passed to the redis.Redis connection
initializer
- redis_key (str, optional): Redis key to get value, can be provided at initialization or runtime
- **kwargs (dict, optional): additional keyword arguments to pass to the
Task constructor
"""
from typing import List, Dict, Union
import azureml.core.datastore
from azureml.core.workspace import Workspace
from azureml.data.azure_storage_datastore import (
AzureBlobDatastore,
AbstractAzureStorageDatastore,
)
from azureml.data.data_reference import DataReference
from prefect import Task
from prefect.client import Secret
from prefect.utilities.tasks import defaults_from_attrs
class DatastoreRegisterBlobContainer(Task):
"""
Task for registering Azure Blob Storage container as a Datastore in a Azure ML service Workspace.
Args:
- workspace (azureml.core.workspace.Workspace): The Workspace to which the Datastore is to be registered.
- container_name (str, optional): The name of the container.
- datastore_name (str, optional): The name of the datastore. If not defined, the container name will be used.
- create_container_if_not_exists (bool, optional): Create a container, if one does not exist with the given name.
- overwrite_existing_datastore (bool, optional): Overwrite an existing datastore. If the datastore does not exist, it will be created.
- azure_credentials_secret (str, optinonal): The name of the Prefect Secret that stores your Azure credentials; this Secret must be a JSON string with two keys: `ACCOUNT_NAME` and `ACCOUNT_KEY`
- set_as_default (bool optional): Set the created Datastore as the default datastore for the Workspace.
- **kwargs (dict, optional): additional keyword arguments to pass to the Task constructor
"""
def __init__(
self,
Args:
- nlp (spaCy text processing pipeline, optional): a custom spaCy text
processing pipeline, must be provided if not
specified in construction
Returns:
- Parser: spaCy Parser object
"""
if nlp is None:
raise ValueError("A spaCy pipeline must be provided")
return nlp.parser
class SpacyNER(Task):
"""
Task for returning named entity recognizer from a spaCy pipeline.
Args:
- nlp (spaCy text processing pipeline, optional): a custom spaCy text
processing pipeline
- **kwargs (dict, optional): additional keyword arguments to pass to the
Task constructor
"""
def __init__(self, nlp=None, **kwargs):
self.nlp = nlp
super().__init__(**kwargs)
@defaults_from_attrs("nlp")
def run(self, nlp=None):
def __init__(self, *args: Any, **kwargs: Any):
super().__init__(*args, **kwargs)
def run(self, x: Any, y: Any) -> bool: # type: ignore
"""
Args:
- x (Any): a value
- y (Any): a value
Returns:
- bool
"""
return bool(x != y)
class GreaterThanOrEqual(Task):
"""
Evaluates `x ≥ y`
Args:
- *args (Any): positional arguments for the `Task` class
- **kwargs (Any): keyword arguments for the `Task` class
"""
def __init__(self, *args: Any, **kwargs: Any):
super().__init__(*args, **kwargs)
def run(self, x: Any, y: Any) -> bool: # type: ignore
"""
Args:
- x (Any): a value
- y (Any): a value