Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_issue_111_crash_on_connect_error(loop):
import aiopg.connection
with pytest.raises(psycopg2.ProgrammingError):
await aiopg.connection.connect('baddsn:1', loop=loop)
n, free = 0, len(self._free)
while n < free:
conn = self._free[-1]
if conn.closed:
self._free.pop()
elif -1 < self._recycle < self._loop.time() - conn.last_usage:
conn.close()
self._free.pop()
else:
self._free.rotate()
n += 1
while self.size < self.minsize:
self._acquiring += 1
try:
conn = await connect(
self._dsn, loop=self._loop, timeout=self._timeout,
enable_json=self._enable_json,
enable_hstore=self._enable_hstore,
enable_uuid=self._enable_uuid,
echo=self._echo,
**self._conn_kwargs)
# raise exception if pool is closing
self._free.append(conn)
self._cond.notify()
finally:
self._acquiring -= 1
if self._free:
return
if override_min and self.size < self.maxsize:
self._acquiring += 1
def connect(dsn=None, *, loop=None, **kwargs):
return (yield from base_connect(dsn, loop=loop,
_connection_factory=SAConnection,
**kwargs))