Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_wrap_query_exceptions_reached_max_tries(mocker):
called = []
@aiopg_connector.wrap_query_exceptions
async def corofunc(connector):
called.append(True)
raise psycopg2.errors.OperationalError(
"server closed the connection unexpectedly"
)
connector = mocker.Mock(_pool=mocker.Mock(maxsize=5))
coro = corofunc(connector)
with pytest.raises(exceptions.ConnectorException) as excinfo:
await coro
assert len(called) == 6
assert str(excinfo.value) == "Could not get a valid connection after 6 tries"
def test_wrap_query_exceptions_reached_max_tries(mocker, pool_args, called_count):
called = []
@psycopg2_connector.wrap_query_exceptions
def func(connector):
called.append(True)
raise psycopg2.errors.AdminShutdown()
connector = mocker.Mock(_pool=mocker.Mock(**pool_args))
with pytest.raises(exceptions.ConnectorException) as excinfo:
func(connector)
assert len(called) == called_count
assert str(
excinfo.value
) == "Could not get a valid connection after {} tries".format(called_count)
async def test_wrap_exceptions_wraps():
@aiopg_connector.wrap_exceptions
async def corofunc():
raise psycopg2.DatabaseError
coro = corofunc()
with pytest.raises(exceptions.ConnectorException):
await coro
async def test_store_defer_job_connector_exception(
mocker, job_store, job_factory, connector
):
connector.execute_query_one_async = mocker.Mock(
side_effect=exceptions.ConnectorException
)
with pytest.raises(exceptions.ConnectorException):
await job_store.defer_job_async(job=job_factory(task_kwargs={"a": "b"}))
def test_wrap_exceptions_wraps():
@psycopg2_connector.wrap_exceptions
def func():
raise psycopg2.DatabaseError
with pytest.raises(exceptions.ConnectorException):
func()
async def test_store_defer_job_connector_exception(
mocker, job_store, job_factory, connector
):
connector.execute_query_one_async = mocker.Mock(
side_effect=exceptions.ConnectorException
)
with pytest.raises(exceptions.ConnectorException):
await job_store.defer_job_async(job=job_factory(task_kwargs={"a": "b"}))
async def wrapped(*args, **kwargs):
try:
return await coro(*args, **kwargs)
except psycopg2.errors.UniqueViolation as exc:
raise exceptions.UniqueViolation(constraint_name=exc.diag.constraint_name)
except psycopg2.Error as exc:
raise exceptions.ConnectorException from exc
async def wrapped(*args, **kwargs):
final_exc = None
try:
max_tries = args[0]._pool.maxsize + 1
except Exception:
max_tries = 1
for _ in range(max_tries):
try:
return await coro(*args, **kwargs)
except psycopg2.errors.OperationalError as exc:
if "server closed the connection unexpectedly" in str(exc):
final_exc = exc
continue
raise exc
raise exceptions.ConnectorException(
"Could not get a valid connection after {} tries".format(max_tries)
) from final_exc
class ConnectorException(ProcrastinateException):
"""
Database error.
"""
# The precise error can be seen with ``exception.__cause__``.
class AlreadyEnqueued(ProcrastinateException):
"""
There is already a job waiting in the queue with the same queueing lock.
"""
class UniqueViolation(ConnectorException):
"""
A unique constraint is violated. The constraint name is available in
``exception.constraint_name``.
"""
def __init__(self, *args, constraint_name: str):
super().__init__(*args)
self.constraint_name = constraint_name
class MissingApp(ProcrastinateException):
"""
Missing app. This most probably happened because procrastinate needs an
app via --app or the PROCRASTINATE_APP environment variable.
"""
def wrapped(*args, **kwargs):
final_exc = None
try:
max_tries = args[0]._pool.maxconn + 1
except Exception:
max_tries = 1
for _ in range(max_tries):
try:
return func(*args, **kwargs)
except psycopg2.errors.AdminShutdown:
continue
raise exceptions.ConnectorException(
"Could not get a valid connection after {} tries".format(max_tries)
) from final_exc