Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_get_queries():
assert {"defer_job", "fetch_job", "finish_job"} <= set(sql.get_queries())
def test_parse_query_file():
assert (
sql.parse_query_file(
"""
-- Hello: This is ignored.
yay
-- query1 --
Select bla
-- query2 --
-- Description
SELECT blu
-- query3 --
-- multi-line
-- description
INSERT INTO blou VALUES(%(yay)s)
def __init__(self):
"""
Attributes
----------
jobs : ``Dict[int, Dict]``
Mapping of ``{: }``
"""
self.reset()
self.reverse_queries = {value: key for key, value in sql.queries.items()}
self.reverse_queries[schema.SchemaManager.get_schema()] = "apply_schema"
async def finish_job(
self,
job: jobs.Job,
status: jobs.Status,
scheduled_at: Optional[datetime.datetime] = None,
) -> None:
assert job.id # TODO remove this
await self.connector.execute_query_async(
query=sql.queries["finish_job"],
job_id=job.id,
status=status.value,
scheduled_at=scheduled_at,
)
async def check_connection_async(self) -> bool:
result = await self.connector.execute_query_one_async(
query=sql.queries["check_connection"],
)
return result["check"]
``lock``, ``args``, ``status``, ``scheduled_at``, ``attempts``).
"""
return [
{
"id": row["id"],
"queue": row["queue_name"],
"task": row["task_name"],
"lock": row["lock"],
"queueing_lock": row["queueing_lock"],
"args": row["args"],
"status": row["status"],
"scheduled_at": row["scheduled_at"],
"attempts": row["attempts"],
}
for row in await self.connector.execute_query_all(
query=sql.queries["list_jobs"],
id=id,
queue_name=queue,
task_name=task,
status=status,
lock=lock,
queueing_lock=queueing_lock,
)
async def get_status_count_async(self) -> Dict[jobs.Status, int]:
result = await self.connector.execute_query_all_async(
query=sql.queries["count_jobs_status"],
)
result_dict = {r["status"]: int(r["count"]) for r in result}
return {status: result_dict.get(status.value, 0) for status in jobs.Status}
Parameters
----------
id : ``int``
Job ID
status : ``str``
New job status (*todo*/*doing*/*succeeded*/*failed*)
Returns
-------
``Dict[str, Any]``
A dictionnary representing the job (``id``, ``queue``, ``task``,
``lock``, ``args``, ``status``, ``scheduled_at``, ``attempts``).
"""
await self.connector.execute_query(
query=sql.queries["set_job_status"], id=id, status=status,
)
(result,) = await self.list_jobs_async(id=id)
return result
def _defer_job_query_kwargs(self, job: jobs.Job) -> Dict[str, Any]:
return {
"query": sql.queries["defer_job"],
"task_name": job.task_name,
"lock": job.lock or str(uuid.uuid4()),
"queueing_lock": job.queueing_lock,
"args": job.task_kwargs,
"scheduled_at": job.scheduled_at,
"queue": job.queue,
}
# query
if pool.maxsize == 1:
logger.warning(
"Listen/Notify capabilities disabled because maximum pool size"
"is set to 1",
extra={"action": "listen_notify_disabled"},
)
return
while True:
async with pool.acquire() as connection:
for channel_name in channels:
await self._execute_query_connection(
connection=connection,
query=self._make_dynamic_query(
query=sql.queries["listen_queue"], channel_name=channel_name
),
)
# Initial set() lets caller know that we're ready to listen
event.set()
await self._loop_notify(event=event, connection=connection)