Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_single_worker_spread_wait(app, mocker):
process_job = mocker.Mock()
wait_for_job = mocker.Mock()
await app.configure_task("bla").defer_async()
class TestWorker(worker.Worker):
stop = False
async def process_job(self, job, worker_id):
process_job(job=job, worker_id=worker_id)
async def wait_for_job(self, timeout):
wait_for_job(timeout)
self.stop_requested = self.stop
self.stop = True
await TestWorker(app=app, timeout=4, concurrency=7).single_worker(worker_id=3)
process_job.assert_called_once()
assert wait_for_job.call_args_list == [mocker.call(4 * (3 + 1)), mocker.call(4 * 7)]
async def test_single_worker_stop_during_wait(app, mocker):
process_job = mocker.Mock()
wait_for_job = mocker.Mock()
await app.configure_task("bla").defer_async()
class TestWorker(worker.Worker):
async def process_job(self, job, worker_id):
process_job(job=job, worker_id=worker_id)
async def wait_for_job(self, timeout):
wait_for_job()
self.stop_requested = True
await TestWorker(app=app).single_worker(worker_id=0)
process_job.assert_called_once()
wait_for_job.assert_called_once()
async def test_wrap_query_exceptions_reached_max_tries(mocker):
called = []
@aiopg_connector.wrap_query_exceptions
async def corofunc(connector):
called.append(True)
raise psycopg2.errors.OperationalError(
"server closed the connection unexpectedly"
)
connector = mocker.Mock(_pool=mocker.Mock(maxsize=5))
coro = corofunc(connector)
with pytest.raises(exceptions.ConnectorException) as excinfo:
await coro
assert len(called) == 6
assert str(excinfo.value) == "Could not get a valid connection after 6 tries"
async def test_missing_app_async(method_name, kwargs):
with pytest.raises(exceptions.SyncConnectorConfigurationError):
# Some of this methods are not async but they'll raise
# before the await is reached.
await getattr(connector_module.BaseConnector(), method_name)(**kwargs)
def test_load_task_not_a_task():
with pytest.raises(exceptions.TaskNotFound):
tasks.load_task("json.loads")
def test_load_from_path_wrong_type():
with pytest.raises(exceptions.LoadFromPathError):
utils.load_from_path("json.loads", int)
def test_delete_old_jobs_job_is_not_finished(get_all, pg_job_store):
pg_job_store.defer_job(
jobs.Job(
id=0,
queue="queue_a",
task_name="task_1",
lock="lock_1",
task_kwargs={"a": "b"},
)
)
# No started job
pg_job_store.delete_old_jobs(nb_hours=0)
assert len(get_all("procrastinate_jobs", "id")) == 1
# We start a job
job = pg_job_store.fetch_job(queues=["queue_a"])
# We back date the started event
with pg_job_store.connection.cursor() as cursor:
async def test_defer_job(aiopg_job_store, get_all):
queue = "marsupilami"
job = jobs.Job(
id=0, queue=queue, task_name="bob", lock="sher", task_kwargs={"a": 1, "b": 2}
)
pk = await aiopg_job_store.defer_job(job=job)
result = get_all("procrastinate_jobs", "id", "args", "status", "lock", "task_name")
assert result == [
{
"id": pk,
"args": {"a": 1, "b": 2},
"status": "todo",
"lock": "sher",
"task_name": "bob",
}
async def test_run_job_retry(app):
def job(a, b): # pylint: disable=unused-argument
raise ValueError("nope")
task = tasks.Task(job, app=app, queue="yay", name="job", retry=True)
task.func = job
app.tasks = {"job": task}
job = jobs.Job(
id=16,
task_kwargs={"a": 9, "b": 3},
lock="sherlock",
task_name="job",
queueing_lock="houba",
queue="yay",
)
test_worker = worker.Worker(app, queues=["yay"])
with pytest.raises(exceptions.JobRetry):
await test_worker.run_job(job=job, worker_id=3)
def test_finish_job(get_all, pg_job_store):
pg_job_store.defer_job(
jobs.Job(
id=0,
queue="queue_a",
task_name="task_1",
lock="lock_1",
task_kwargs={"a": "b"},
)
)
job = pg_job_store.fetch_job(queues=["queue_a"])
assert get_all("procrastinate_jobs", "status") == [{"status": "doing"}]
started_at = get_all("procrastinate_jobs", "started_at")[0]["started_at"]
assert started_at.date() == datetime.datetime.utcnow().date()
assert get_all("procrastinate_jobs", "attempts") == [{"attempts": 0}]
pg_job_store.finish_job(job=job, status=jobs.Status.SUCCEEDED)