Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def result(self, task_id, blocking=False, timeout=None, backoff=1.15,
max_delay=1.0, revoke_on_timeout=False, preserve=False):
"""
Retrieve the results of a task, given the task's ID. This
method accepts the same parameters and has the same behavior
as the :py:class:`TaskResultWrapper` object.
"""
if not blocking:
return self.get(task_id, peek=preserve)
else:
task_result = TaskResultWrapper(self, QueueTask(task_id=task_id))
return task_result.get(
blocking=blocking,
timeout=timeout,
backoff=backoff,
max_delay=max_delay,
revoke_on_timeout=revoke_on_timeout,
preserve=preserve)
def is_revoked(self, task, dt=None, peek=True):
if isclass(task) and issubclass(task, QueueTask):
revoke_id = 'rt:%s' % task.__name__
is_revoked, can_restore = self._check_revoked(revoke_id, dt, peek)
if can_restore:
self.restore_all(task)
return is_revoked
if not isinstance(task, QueueTask):
task = QueueTask(task_id=task)
is_revoked, can_restore = self._check_revoked(task.revoke_id, dt, peek)
if can_restore:
self.restore(task)
if not is_revoked:
is_revoked = self.is_revoked(type(task), dt, peek)
return is_revoked
def execute(self, task):
if not isinstance(task, QueueTask):
raise TypeError('Unknown object: %s' % task)
try:
result = task.execute()
except Exception as exc:
if self.result_store:
metadata = self._get_task_metadata(task, True)
metadata['error'] = repr(exc)
metadata['traceback'] = traceback.format_exc()
self.put(task.task_id, Error(metadata))
if self.store_errors:
self.put_error(metadata)
raise
if self.result_store and not isinstance(task, PeriodicQueueTask):
if result is not None or self.store_none:
def is_revoked(self, task, dt=None, peek=True):
if isclass(task) and issubclass(task, QueueTask):
revoke_id = 'rt:%s' % task.__name__
is_revoked, can_restore = self._check_revoked(revoke_id, dt, peek)
if can_restore:
self.restore_all(task)
return is_revoked
if not isinstance(task, QueueTask):
task = QueueTask(task_id=task)
is_revoked, can_restore = self._check_revoked(task.revoke_id, dt, peek)
if can_restore:
self.restore(task)
if not is_revoked:
is_revoked = self.is_revoked(type(task), dt, peek)
return is_revoked
def restore_by_id(self, task_id):
return self.restore(QueueTask(task_id=task_id))
def decorator(func):
"""
Decorator to execute a function out-of-band via the consumer.
"""
klass = create_task(
QueueTask,
func,
retries_as_argument,
name,
include_task,
**task_settings)
self.registry.register(klass)
func = self._add_task_control_helpers(klass, func)
def schedule(args=None, kwargs=None, eta=None, delay=None,
convert_utc=True, task_id=None):
execute_time = self._normalize_execute_time(
eta=eta, delay=delay, convert_utc=convert_utc)
cmd = klass(
(args or (), kwargs or {}),
execute_time=execute_time,
def revoke_by_id(self, task_id, revoke_until=None, revoke_once=False):
return self.revoke(QueueTask(task_id=task_id), revoke_until,
revoke_once)
def set_data(self, data):
self.data = data
def execute(self):
"""Execute any arbitary code here"""
raise NotImplementedError
def __eq__(self, rhs):
return (
self.task_id == rhs.task_id and
self.execute_time == rhs.execute_time and
type(self) == type(rhs))
class PeriodicQueueTask(QueueTask):
def validate_datetime(self, dt):
"""Validate that the task should execute at the given datetime"""
return False
def create_task(task_class, func, retries_as_argument=False, task_name=None,
include_task=False, **kwargs):
def execute(self):
args, kwargs = self.data or ((), {})
if retries_as_argument:
kwargs['retries'] = self.retries
if include_task:
kwargs['task'] = self
return func(*args, **kwargs)
attrs = {