Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def deserialize_result(r: bytes, *, deserializer: Optional[Deserializer] = None) -> JobResult:
if deserializer is None:
deserializer = pickle.loads
try:
d = deserializer(r)
return JobResult(
job_try=d['t'],
function=d['f'],
args=d['a'],
kwargs=d['k'],
enqueue_time=ms_to_datetime(d['et']),
score=None,
success=d['s'],
result=d['r'],
start_time=ms_to_datetime(d['st']),
finish_time=ms_to_datetime(d['ft']),
)
except Exception as e:
raise DeserializationError('unable to deserialize job result') from e
def deserialize_result(r: bytes, *, deserializer: Optional[Deserializer] = None) -> JobResult:
if deserializer is None:
deserializer = pickle.loads
try:
d = deserializer(r)
return JobResult(
job_try=d['t'],
function=d['f'],
args=d['a'],
kwargs=d['k'],
enqueue_time=ms_to_datetime(d['et']),
score=None,
success=d['s'],
result=d['r'],
start_time=ms_to_datetime(d['st']),
finish_time=ms_to_datetime(d['ft']),
)
except Exception as e:
raise DeserializationError('unable to deserialize job result') from e
def deserialize_result(r: bytes, *, deserializer: Optional[Deserializer] = None) -> JobResult:
if deserializer is None:
deserializer = pickle.loads
try:
d = deserializer(r)
return JobResult(
job_try=d['t'],
function=d['f'],
args=d['a'],
kwargs=d['k'],
enqueue_time=ms_to_datetime(d['et']),
score=None,
success=d['s'],
result=d['r'],
start_time=ms_to_datetime(d['st']),
finish_time=ms_to_datetime(d['ft']),
)
except Exception as e:
raise DeserializationError('unable to deserialize job result') from e
def deserialize_job(r: bytes, *, deserializer: Optional[Deserializer] = None) -> JobDef:
if deserializer is None:
deserializer = pickle.loads
try:
d = deserializer(r)
return JobDef(
function=d['f'],
args=d['a'],
kwargs=d['k'],
job_try=d['t'],
enqueue_time=ms_to_datetime(d['et']),
score=None,
)
except Exception as e:
raise DeserializationError('unable to deserialize job') from e
start_ms,
timestamp_ms(),
ref,
serializer=self.job_serializer,
)
return await asyncio.shield(self.abort_job(job_id, result_data))
result = no_result
exc_extra = None
finish = False
timeout_s = self.job_timeout_s if function.timeout_s is None else function.timeout_s
incr_score = None
job_ctx = {
'job_id': job_id,
'job_try': job_try,
'enqueue_time': ms_to_datetime(enqueue_time_ms),
'score': score,
}
ctx = {**self.ctx, **job_ctx}
start_ms = timestamp_ms()
success = False
try:
s = args_to_string(args, kwargs)
extra = f' try={job_try}' if job_try > 1 else ''
if (start_ms - score) > 1200:
extra += f' delayed={(start_ms - score) / 1000:0.2f}s'
logger.info('%6.2fs → %s(%s)%s', (start_ms - enqueue_time_ms) / 1000, ref, s, extra)
# run repr(result) and extra inside try/except as they can raise exceptions
try:
async with async_timeout.timeout(timeout_s):
result = await function.coroutine(ctx, *args, **kwargs)
except Exception as e: