Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_set_pool_already_set(mocker):
pool = mocker.Mock()
connector = aiopg_connector.AiopgConnector()
connector.set_pool(pool)
with pytest.raises(exceptions.PoolAlreadySet):
connector.set_pool(pool)
def _(**kwargs):
json_dumps = kwargs.pop("json_dumps", None)
json_loads = kwargs.pop("json_loads", None)
connection_params.update(kwargs)
connector = aiopg_connector.AiopgConnector(
json_dumps=json_dumps, json_loads=json_loads, **connection_params
)
connectors.append(connector)
return connector
def schema_database(db_factory, pum):
dbname = "procrastinate_schema"
db_factory(dbname=dbname)
# apply the current procrastinate schema to procrastinate_schema
connector = aiopg_connector.AiopgConnector(dbname=dbname)
schema_manager = schema.SchemaManager(connector=connector)
schema_manager.apply_schema()
connector.close()
# set the baseline version in procrastinate_schema
# This db is as far as can be.
pum("baseline", f"--pg_service {dbname} --baseline 999.999.999")
return dbname
async def test_destructor(connection_params, capsys):
connector = aiopg_connector.AiopgConnector(**connection_params)
await connector.execute_query_async("SELECT 1")
assert len(connector._pool._free) == 1
# "del connector" causes a ResourceWarning from aiopg.Pool if the
# AiopgConnector destructor doesn't close the connections managed
# by the pool. Unfortunately there's no way to catch that warning,
# even by using filterwarnings to turn it into an exception, as
# Python ignores exceptions that occur in destructors
del connector
def test_wrap_exceptions_applied(method_name):
connector = aiopg_connector.AiopgConnector()
assert getattr(connector, method_name)._exceptions_wrapped is True
def test_set_pool(mocker):
pool = mocker.Mock()
connector = aiopg_connector.AiopgConnector()
connector.set_pool(pool)
assert connector._pool is pool
async def test_adapt_pool_args_on_connect(mocker):
called = []
async def on_connect(connection):
called.append(connection)
args = aiopg_connector.AiopgConnector._adapt_pool_args(
pool_args={"on_connect": on_connect}, json_loads=None
)
assert args["on_connect"] is not on_connect
connection = mocker.Mock(_pool=None)
await args["on_connect"](connection)
assert called == [connection]
async def test_listen_notify_pool_one_connection(mocker, caplog):
pool = mocker.Mock(maxsize=1)
connector = aiopg_connector.AiopgConnector()
connector.set_pool(pool)
caplog.clear()
await connector.listen_notify(None, None)
assert {e.action for e in caplog.records} == {"listen_notify_disabled"}