Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
'enqueue_time': CloseToNow(),
'score': AnyInt(),
},
{
'function': 'third',
'args': (7,),
'kwargs': {'b': 8},
'job_try': None,
'enqueue_time': CloseToNow(),
'score': AnyInt(),
},
]
assert jobs[0].score < jobs[1].score < jobs[2].score
assert isinstance(jobs[0], JobDef)
assert isinstance(jobs[1], JobDef)
assert isinstance(jobs[2], JobDef)
'kwargs': {'b': 5, 'c': 6},
'job_try': None,
'enqueue_time': CloseToNow(),
'score': AnyInt(),
},
{
'function': 'third',
'args': (7,),
'kwargs': {'b': 8},
'job_try': None,
'enqueue_time': CloseToNow(),
'score': AnyInt(),
},
]
assert jobs[0].score < jobs[1].score < jobs[2].score
assert isinstance(jobs[0], JobDef)
assert isinstance(jobs[1], JobDef)
assert isinstance(jobs[2], JobDef)
async def info(self) -> Optional[JobDef]:
"""
All information on a job, including its result if it's available, does not wait for the result.
"""
info = await self.result_info()
if not info:
v = await self._redis.get(job_key_prefix + self.job_id, encoding=None)
if v:
info = deserialize_job(v, deserializer=self._deserializer)
if info:
info.score = await self._redis.zscore(self._queue_name, self.job_id)
return info
#: job not found in any way
not_found = 'not_found'
@dataclass
class JobDef:
function: str
args: tuple
kwargs: dict
job_try: int
enqueue_time: datetime
score: Optional[int]
@dataclass
class JobResult(JobDef):
success: bool
result: Any
start_time: datetime
finish_time: datetime
job_id: Optional[str] = None
class Job:
"""
Holds data a reference to a job.
"""
__slots__ = 'job_id', '_redis', '_queue_name', '_deserializer'
def __init__(
self, job_id: str, redis, _queue_name: str = default_queue_name, _deserializer: Optional[Deserializer] = None
def deserialize_job(r: bytes, *, deserializer: Optional[Deserializer] = None) -> JobDef:
if deserializer is None:
deserializer = pickle.loads
try:
d = deserializer(r)
return JobDef(
function=d['f'],
args=d['a'],
kwargs=d['k'],
job_try=d['t'],
enqueue_time=ms_to_datetime(d['et']),
score=None,
)
except Exception as e:
raise DeserializationError('unable to deserialize job') from e
async def queued_jobs(self, *, queue_name: str = default_queue_name) -> List[JobDef]:
"""
Get information about queued, mostly useful when testing.
"""
jobs = await self.zrange(queue_name, withscores=True)
return await asyncio.gather(*[self._get_job_def(job_id, score) for job_id, score in jobs])