Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_run_job_retry(app):
def job(a, b): # pylint: disable=unused-argument
raise ValueError("nope")
task = tasks.Task(job, app=app, queue="yay", name="job", retry=True)
task.func = job
app.tasks = {"job": task}
job = jobs.Job(
id=16,
task_kwargs={"a": 9, "b": 3},
lock="sherlock",
task_name="job",
queueing_lock="houba",
queue="yay",
)
test_worker = worker.Worker(app, queues=["yay"])
with pytest.raises(exceptions.JobRetry):
await test_worker.run_job(job=job, worker_id=3)
def test_app_register(app):
task = tasks.Task(task_func, app=app, queue="queue", name="bla")
app._register(task)
assert app.queues == {"queue", "builtin"}
assert "bla" in app.tasks
assert app.tasks["bla"] == task
def test_task_init_with_no_name(app):
task = tasks.Task(task_func, app=app, queue="queue")
assert task.func is task_func
assert task.name == "tests.unit.test_tasks.task_func"
def test_task_configure_override_queue(app):
task = tasks.Task(task_func, app=app, queue="queue")
job = task.configure(queue="other_queue").job
assert job.queue == "other_queue"
async def test_run_job_log_result(caplog, app):
caplog.set_level("INFO")
result = []
def task_func(a, b): # pylint: disable=unused-argument
s = a + b
result.append(s)
return s
task = tasks.Task(task_func, app=app, queue="yay", name="job")
app.tasks = {"task_func": task}
job = jobs.Job(
id=16,
task_kwargs={"a": 9, "b": 3},
lock="sherlock",
queueing_lock="houba",
task_name="task_func",
queue="yay",
)
test_worker = worker.Worker(app, queues=["yay"])
await test_worker.run_job(job=job, worker_id=3)
assert result == [12]
def _wrap(func: Callable[..., "tasks.Task"]):
from procrastinate import tasks
task = tasks.Task(
func,
app=self,
queue=queue,
name=name,
retry=retry,
pass_context=pass_context,
)
self._register(task)
return functools.update_wrapper(task, func)
Procrastinate `App` running this job
worker_name : ``str``
Name of the worker (may be useful for logging)
worker_queues : ``Optional[Iterable[str]]``
Queues listened by this worker
job : `Job`
Current `Job` instance
task : `Task`
Current `Task` instance
"""
app: Optional[app_module.App] = None
worker_name: Optional[str] = None
worker_queues: Optional[Iterable[str]] = None
job: Optional[jobs.Job] = None
task: Optional[tasks.Task] = None
additional_context: Dict = attr.ib(factory=dict)
def log_extra(self, action: str, **kwargs: Any) -> types.JSONDict:
extra: types.JSONDict = {
"action": action,
"worker": {"name": self.worker_name, "queues": self.worker_queues},
}
if self.job:
extra["job"] = self.job.log_context()
return {**extra, **self.additional_context, **kwargs}
def evolve(self, **update) -> "JobContext":
return attr.evolve(self, **update)
@property
import attr
import croniter
from procrastinate import store, tasks
# The maximum delay after which tasks will be considered as
# outdated, and ignored.
MAX_DELAY = 60 * 10 # 10 minutes
# We'll always be oversleeping by this amount to avoid waking up too early for our
# tasks. This is, of course, the most important part of procrastinate ;)
MARGIN = 0.5 # seconds
logger = logging.getLogger(__name__)
TaskAtTime = Tuple[tasks.Task, int]
@attr.dataclass(frozen=True)
class PeriodicTask:
task: tasks.Task
cron: str
@functools.lru_cache(maxsize=1)
def croniter(self) -> croniter.croniter:
return croniter.croniter(self.cron)
class PeriodicDeferrer:
def __init__(self, job_store: store.JobStore, max_delay: float = MAX_DELAY):
self.periodic_tasks: List[PeriodicTask] = []
self.job_store = job_store
def load_task(self, task_name: str, worker_id: int) -> tasks.Task:
if task_name in self.known_missing_tasks:
raise exceptions.TaskNotFound(f"Cancelling job for {task_name} (not found)")
try:
# Simple case: the task is already known
return self.app.tasks[task_name]
except KeyError:
pass
# Will raise if not found or not a task
try:
task = tasks.load_task(task_name)
except exceptions.ProcrastinateException:
self.known_missing_tasks.add(task_name)
raise
context = self.context_for_worker(worker_id=worker_id)
self.logger.warning(
f"Task at {task_name} was not registered, it's been loaded dynamically.",
extra=context.log_extra(action="load_dynamic_task", task_name=task_name),
)
self.app.tasks[task_name] = task
return task
# The maximum delay after which tasks will be considered as
# outdated, and ignored.
MAX_DELAY = 60 * 10 # 10 minutes
# We'll always be oversleeping by this amount to avoid waking up too early for our
# tasks. This is, of course, the most important part of procrastinate ;)
MARGIN = 0.5 # seconds
logger = logging.getLogger(__name__)
TaskAtTime = Tuple[tasks.Task, int]
@attr.dataclass(frozen=True)
class PeriodicTask:
task: tasks.Task
cron: str
@functools.lru_cache(maxsize=1)
def croniter(self) -> croniter.croniter:
return croniter.croniter(self.cron)
class PeriodicDeferrer:
def __init__(self, job_store: store.JobStore, max_delay: float = MAX_DELAY):
self.periodic_tasks: List[PeriodicTask] = []
self.job_store = job_store
# {task_name: defer_timestamp}
self.last_defers: Dict[str, int] = {}
self.max_delay = max_delay
def periodic_decorator(self, cron: str):