Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def raise_exc():
raise exceptions.ProcrastinateException("foo") from IndexError("bar")
def load_task(self, task_name: str, worker_id: int) -> tasks.Task:
if task_name in self.known_missing_tasks:
raise exceptions.TaskNotFound(f"Cancelling job for {task_name} (not found)")
try:
# Simple case: the task is already known
return self.app.tasks[task_name]
except KeyError:
pass
# Will raise if not found or not a task
try:
task = tasks.load_task(task_name)
except exceptions.ProcrastinateException:
self.known_missing_tasks.add(task_name)
raise
context = self.context_for_worker(worker_id=worker_id)
self.logger.warning(
f"Task at {task_name} was not registered, it's been loaded dynamically.",
extra=context.log_extra(action="load_dynamic_task", task_name=task_name),
)
self.app.tasks[task_name] = task
return task
import datetime
class ProcrastinateException(Exception):
"""
Unexpected Procrastinate error.
"""
def __init__(self, message=None):
if not message:
message = self.__doc__
super().__init__(message)
class TaskNotFound(ProcrastinateException):
"""
Task cannot be imported.
"""
class JobError(ProcrastinateException):
"""
Job ended with an exception.
"""
class LoadFromPathError(ImportError, ProcrastinateException):
"""
App was not found at the provided path, or the loaded object is not an App.
"""
import datetime
class ProcrastinateException(Exception):
pass
class TaskNotFound(ProcrastinateException):
pass
class JobError(ProcrastinateException):
pass
class LoadFromPathError(ImportError, ProcrastinateException):
pass
class JobRetry(ProcrastinateException):
def __init__(self, scheduled_at: datetime.datetime):
self.scheduled_at = scheduled_at
class StopRequested(ProcrastinateException):
pass
class NoMoreJobs(ProcrastinateException):
pass
import datetime
class ProcrastinateException(Exception):
pass
class TaskNotFound(ProcrastinateException):
pass
class JobError(ProcrastinateException):
pass
class LoadFromPathError(ImportError, ProcrastinateException):
pass
class JobRetry(ProcrastinateException):
def __init__(self, scheduled_at: datetime.datetime):
self.scheduled_at = scheduled_at
Job should be retried.
"""
def __init__(self, scheduled_at: datetime.datetime):
self.scheduled_at = scheduled_at
super().__init__()
class PoolAlreadySet(ProcrastinateException):
"""
connector.set_pool() was called but the pool already had a set.
Changing the pool of a connector is not permitted.
"""
class ConnectorException(ProcrastinateException):
"""
Database error.
"""
# The precise error can be seen with ``exception.__cause__``.
class AlreadyEnqueued(ProcrastinateException):
"""
There is already a job waiting in the queue with the same queueing lock.
"""
class UniqueViolation(ConnectorException):
"""
A unique constraint is violated. The constraint name is available in
"""
class JobError(ProcrastinateException):
"""
Job ended with an exception.
"""
class LoadFromPathError(ImportError, ProcrastinateException):
"""
App was not found at the provided path, or the loaded object is not an App.
"""
class JobRetry(ProcrastinateException):
"""
Job should be retried.
"""
def __init__(self, scheduled_at: datetime.datetime):
self.scheduled_at = scheduled_at
super().__init__()
class PoolAlreadySet(ProcrastinateException):
"""
connector.set_pool() was called but the pool already had a set.
Changing the pool of a connector is not permitted.
"""
class PoolAlreadySet(ProcrastinateException):
"""
connector.set_pool() was called but the pool already had a set.
Changing the pool of a connector is not permitted.
"""
class ConnectorException(ProcrastinateException):
"""
Database error.
"""
# The precise error can be seen with ``exception.__cause__``.
class AlreadyEnqueued(ProcrastinateException):
"""
There is already a job waiting in the queue with the same queueing lock.
"""
class UniqueViolation(ConnectorException):
"""
A unique constraint is violated. The constraint name is available in
``exception.constraint_name``.
"""
def __init__(self, *args, constraint_name: str):
super().__init__(*args)
self.constraint_name = constraint_name
"""
App was not found at the provided path, or the loaded object is not an App.
"""
class JobRetry(ProcrastinateException):
"""
Job should be retried.
"""
def __init__(self, scheduled_at: datetime.datetime):
self.scheduled_at = scheduled_at
super().__init__()
class PoolAlreadySet(ProcrastinateException):
"""
connector.set_pool() was called but the pool already had a set.
Changing the pool of a connector is not permitted.
"""
class ConnectorException(ProcrastinateException):
"""
Database error.
"""
# The precise error can be seen with ``exception.__cause__``.
class AlreadyEnqueued(ProcrastinateException):
"""
class LoadFromPathError(ImportError, ProcrastinateException):
pass
class JobRetry(ProcrastinateException):
def __init__(self, scheduled_at: datetime.datetime):
self.scheduled_at = scheduled_at
class StopRequested(ProcrastinateException):
pass
class NoMoreJobs(ProcrastinateException):
pass