How to use the prefect.Task function in prefect

To help you get started, we’ve selected a few prefect examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github PrefectHQ / prefect / src / prefect / tasks / core / operators.py View on Github external
def __init__(self, *args: Any, **kwargs: Any):
        super().__init__(*args, **kwargs)

    def run(self, x: Any, y: Any) -> Any:  # type: ignore
        """
        Args:
           - x (Any): a value
           - y (Any): a value

        Returns:
            - Any
        """
        return x // y


class Pow(Task):
    """
    Evaluates `x ** y`

    Args:
        - *args (Any): positional arguments for the `Task` class
        - **kwargs (Any): keyword arguments for the `Task` class
    """

    def __init__(self, *args: Any, **kwargs: Any):
        super().__init__(*args, **kwargs)

    def run(self, x: Any, y: Any) -> Any:  # type: ignore
        """
        Args:
           - x (Any): a value
           - y (Any): a value
github PrefectHQ / prefect / tests / utilities / test_notifications.py View on Github external
def test_formatter_formats_states(state):
    orig = slack_message_formatter(Task(), state())
    assert json.loads(json.dumps(orig)) == orig
github PrefectHQ / prefect / tests / engine / cloud / test_cloud_flows.py View on Github external
because they won't pass their upstream checks
    """

    @prefect.task(max_retries=1, retry_delay=datetime.timedelta(minutes=2))
    def error():
        1 / 0

    flow_run_id = str(uuid.uuid4())
    task_run_id_1 = str(uuid.uuid4())
    task_run_id_2 = str(uuid.uuid4())
    task_run_id_3 = str(uuid.uuid4())

    with prefect.Flow(name="test") as flow:
        t1 = error()
        t2 = prefect.Task()
        t3 = prefect.Task()
        t2.set_upstream(t1)
        t3.set_upstream(t2)

    client = MockedCloudClient(
        flow_runs=[FlowRun(id=flow_run_id)],
        task_runs=[
            TaskRun(id=task_run_id_1, task_slug=t1.slug, flow_run_id=flow_run_id),
            TaskRun(id=task_run_id_2, task_slug=t2.slug, flow_run_id=flow_run_id),
            TaskRun(id=task_run_id_3, task_slug=t3.slug, flow_run_id=flow_run_id),
        ],
        monkeypatch=monkeypatch,
    )

    with prefect.context(flow_run_id=flow_run_id):
        state = CloudFlowRunner(flow=flow).run(
            return_tasks=flow.tasks, executor=executor
github PrefectHQ / prefect / tests / engine / cloud / test_cloud_flow_runner.py View on Github external
def test_client_is_always_called_even_during_state_handler_failures(client):
    def handler(task, old, new):
        1 / 0

    flow = prefect.Flow(tasks=[prefect.Task()], state_handlers=[handler])

    ## flow run setup
    res = flow.run(state=Pending())

    ## assertions
    assert client.get_flow_run_info.call_count == 1  # one time to pull latest state
    assert client.set_flow_run_state.call_count == 1  # Failed

    flow_states = [
        call[1]["state"] for call in client.set_flow_run_state.call_args_list
    ]
    state = flow_states.pop()
    assert state.is_failed()
    assert "state handlers" in state.message
    assert isinstance(state.result, ZeroDivisionError)
    assert client.get_task_run_info.call_count == 0
github PrefectHQ / prefect / tests / engine / cloud / test_cloud_flows.py View on Github external
def test_simple_two_task_flow_with_final_task_set_to_fail(monkeypatch, executor):

    flow_run_id = str(uuid.uuid4())
    task_run_id_1 = str(uuid.uuid4())
    task_run_id_2 = str(uuid.uuid4())

    with prefect.Flow(name="test") as flow:
        t1 = prefect.Task()
        t2 = prefect.Task()
        t2.set_upstream(t1)

    client = MockedCloudClient(
        flow_runs=[FlowRun(id=flow_run_id)],
        task_runs=[
            TaskRun(id=task_run_id_1, task_slug=t1.slug, flow_run_id=flow_run_id),
            TaskRun(
                id=task_run_id_2,
                task_slug=t2.slug,
                flow_run_id=flow_run_id,
                state=Failed(),
            ),
        ],
        monkeypatch=monkeypatch,
    )
github PrefectHQ / prefect / src / prefect / tasks / kubernetes / service.py View on Github external
configuration.api_key["authorization"] = kubernetes_api_key
            api_client = client.CoreV1Api(client.ApiClient(configuration))
        else:
            try:
                config.load_incluster_config()
            except config.config_exception.ConfigException:
                config.load_kube_config()

            api_client = client.CoreV1Api()

        kube_kwargs = {**self.kube_kwargs, **(kube_kwargs or {})}

        return api_client.list_namespaced_service(namespace=namespace, **kube_kwargs)


class PatchNamespacedService(Task):
    """
    Task for patching a namespaced service on Kubernetes.
    Note that all initialization arguments can optionally be provided or overwritten at runtime.

    This task will attempt to connect to a Kubernetes cluster in three steps with
    the first successful connection attempt becoming the mode of communication with a
    cluster.

    1. Attempt to use a Prefect Secret that contains a Kubernetes API Key
    2. Attempt in-cluster connection (will only work when running on a Service in a cluster)
    3. Attempt out-of-cluster connection using the default location for a kube config file

    The arguments `body` and `kube_kwargs` will perform an in-place update when the task
    is run. This means that it is possible to provide `body = {"info": "here"}` at
    instantiation and then provide `body = {"more": "info"}` at run time which will make
    `body = {"info": "here", "more": "info"}`. *Note*: Keys present in both instantiation
github PrefectHQ / prefect / src / prefect / tasks / redis / redis_tasks.py View on Github external
connection = redis.Redis(
            host=self.host,
            port=self.port,
            db=self.db,
            password=password,
            **self.redis_connection_params
        )

        result = connection.set(
            name=redis_key, value=redis_val, ex=ex, px=px, nx=nx, xx=xx
        )

        return result


class RedisGet(Task):
    """
    Task for getting a value based on key from a Redis connection.

    Args:
        - host (str, optional): name of Redis host, defaults to 'localhost'
        - port (int, optional): Redis port, defaults to 6379
        - db (int, optional): redis database index, defaults to 0
        - password_secret (str, optional): the name of the Prefect Secret
            which stores your Redis password
        - redis_connection_params (dict, optional): key-value pairs passed to the redis.Redis connection
            initializer
        - redis_key (str, optional): Redis key to get value, can be provided at initialization or runtime
        - **kwargs (dict, optional): additional keyword arguments to pass to the
            Task constructor
    """
github PrefectHQ / prefect / src / prefect / tasks / azureml / datastore.py View on Github external
from typing import List, Dict, Union

import azureml.core.datastore
from azureml.core.workspace import Workspace
from azureml.data.azure_storage_datastore import (
    AzureBlobDatastore,
    AbstractAzureStorageDatastore,
)
from azureml.data.data_reference import DataReference

from prefect import Task
from prefect.client import Secret
from prefect.utilities.tasks import defaults_from_attrs


class DatastoreRegisterBlobContainer(Task):
    """
    Task for registering Azure Blob Storage container as a Datastore in a Azure ML service Workspace. 

    Args:
        - workspace (azureml.core.workspace.Workspace): The Workspace to which the Datastore is to be registered.
        - container_name (str, optional): The name of the container.
        - datastore_name (str, optional): The name of the datastore. If not defined, the container name will be used.
        - create_container_if_not_exists (bool, optional): Create a container, if one does not exist with the given name. 
        - overwrite_existing_datastore (bool, optional): Overwrite an existing datastore. If the datastore does not exist, it will be created.
        - azure_credentials_secret (str, optinonal): The name of the Prefect Secret that stores your Azure credentials; this Secret must be a JSON string with two keys: `ACCOUNT_NAME` and `ACCOUNT_KEY`
        - set_as_default (bool optional): Set the created Datastore as the default datastore for the Workspace.
        - **kwargs (dict, optional): additional keyword arguments to pass to the Task constructor
    """

    def __init__(
        self,
github PrefectHQ / prefect / src / prefect / tasks / spacy / spacy_tasks.py View on Github external
Args:
            - nlp (spaCy text processing pipeline, optional): a custom spaCy text
                processing pipeline, must be provided if not
                specified in construction

        Returns:
            - Parser: spaCy Parser object

        """
        if nlp is None:
            raise ValueError("A spaCy pipeline must be provided")

        return nlp.parser


class SpacyNER(Task):
    """
    Task for returning named entity recognizer from a spaCy pipeline.

    Args:
        - nlp (spaCy text processing pipeline, optional): a custom spaCy text
            processing pipeline
        - **kwargs (dict, optional): additional keyword arguments to pass to the
            Task constructor
    """

    def __init__(self, nlp=None, **kwargs):
        self.nlp = nlp
        super().__init__(**kwargs)

    @defaults_from_attrs("nlp")
    def run(self, nlp=None):
github PrefectHQ / prefect / src / prefect / tasks / core / operators.py View on Github external
def __init__(self, *args: Any, **kwargs: Any):
        super().__init__(*args, **kwargs)

    def run(self, x: Any, y: Any) -> bool:  # type: ignore
        """
        Args:
            - x (Any): a value
            - y (Any): a value

        Returns:
            - bool
        """
        return bool(x != y)


class GreaterThanOrEqual(Task):
    """
    Evaluates `x ≥ y`

    Args:
        - *args (Any): positional arguments for the `Task` class
        - **kwargs (Any): keyword arguments for the `Task` class
    """

    def __init__(self, *args: Any, **kwargs: Any):
        super().__init__(*args, **kwargs)

    def run(self, x: Any, y: Any) -> bool:  # type: ignore
        """
        Args:
            - x (Any): a value
            - y (Any): a value