Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def defer_job_one(
self, task_name, lock, queueing_lock, args, scheduled_at, queue
) -> JobRow:
if any(
job for job in self.jobs.values() if job["queueing_lock"] == queueing_lock
):
raise exceptions.UniqueViolation(
constraint_name=connector.QUEUEING_LOCK_CONSTRAINT
)
id = next(self.job_counter)
self.jobs[id] = job_row = {
"id": id,
"queue_name": queue,
"task_name": task_name,
"lock": lock,
"queueing_lock": queueing_lock,
"args": args,
"status": "todo",
"scheduled_at": scheduled_at,
"attempts": 0,
}
async def test_store_defer_job_unique_violation_exception_other_constraint(
mocker, job_store, job_factory, connector
):
connector.execute_query_one_async = mocker.Mock(
side_effect=exceptions.UniqueViolation(constraint_name="some_other_constraint")
)
with pytest.raises(exceptions.ConnectorException):
await job_store.defer_job_async(job=job_factory(task_kwargs={"a": "b"}))
async def wrapped(*args, **kwargs):
try:
return await coro(*args, **kwargs)
except psycopg2.errors.UniqueViolation as exc:
raise exceptions.UniqueViolation(constraint_name=exc.diag.constraint_name)
except psycopg2.Error as exc:
raise exceptions.ConnectorException from exc
def wrapped(*args, **kwargs):
try:
return func(*args, **kwargs)
except psycopg2.errors.UniqueViolation as exc:
raise exceptions.UniqueViolation(constraint_name=exc.diag.constraint_name)
except psycopg2.Error as exc:
raise exceptions.ConnectorException from exc
def defer_job(self, job: jobs.Job) -> int:
try:
result = self.connector.execute_query_one(
**self._defer_job_query_kwargs(job=job)
)
except exceptions.UniqueViolation as exc:
self._raise_already_enqueued(exc=exc, job=job)
return result["id"]