How to use the procrastinate.utils function in procrastinate

To help you get started, we’ve selected a few procrastinate examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github peopledoc / procrastinate / tests / unit / test_utils.py View on Github external
def test_add_sync_api():
    result = []

    @utils.add_sync_api
    class Test:
        def a(self):
            result.append("a")

        def b_async(self):
            result.append("b")

        async def c(self):
            result.append("c")

        async def d_async(self):
            result.append("d")

    test = Test()
    funcs = dir(test)
    assert "a" in funcs
github peopledoc / procrastinate / procrastinate / app.py View on Github external
def perform_import_paths(self):
        """
        Whenever using app.tasks, make sure the apps have been imported by calling
        this method.
        """
        utils.import_all(import_paths=self.import_paths)
        logger.debug(
            "All tasks imported",
            extra={"action": "imported_tasks", "tasks": list(self.tasks)},
        )
github peopledoc / procrastinate / procrastinate / app.py View on Github external
def from_path(cls, dotted_path: str) -> "App":
        """
        Create an :py:class:`App` object by dynamically loading the
        object at the given path.

        Parameters
        ----------
        dotted_path :
            Dotted path to the object to load (e.g.
            ``mymodule.submodule.procrastinate_app``)
        """
        return utils.load_from_path(dotted_path, cls)
github peopledoc / procrastinate / procrastinate / schema.py View on Github external
import pathlib

from importlib_resources import read_text

from procrastinate import connector as connector_module
from procrastinate import utils

migrations_path = pathlib.Path(__file__).parent / "sql" / "migrations"


@utils.add_sync_api
class SchemaManager:
    def __init__(self, connector: connector_module.BaseConnector):

        self.connector = connector

    @staticmethod
    def get_schema() -> str:
        return read_text("procrastinate.sql", "schema.sql")

    @staticmethod
    def get_migrations_path() -> str:
        return str(migrations_path)

    async def apply_schema_async(self) -> None:
        queries = self.get_schema()
        await self.connector.execute_query(query=queries)
github peopledoc / procrastinate / procrastinate / tasks.py View on Github external
def full_path(self) -> str:
        return utils.get_full_path(self.func)
github peopledoc / procrastinate / procrastinate / tasks.py View on Github external
def load_task(path: str) -> "Task":
    try:
        task = utils.load_from_path(path, Task)
    except exceptions.LoadFromPathError as exc:
        raise exceptions.TaskNotFound(f"Task at {path} cannot be imported: {str(exc)}")

    return task