Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_acquire(create_pool):
pool = await create_pool()
conn = await pool.acquire()
assert isinstance(conn, Connection)
assert not conn.closed
cur = await conn.cursor()
await cur.execute('SELECT 1')
val = await cur.fetchone()
assert (1,) == val
pool.release(conn)
async def test_connect(connect):
conn = await connect()
assert isinstance(conn, Connection)
assert not conn._writing
assert conn._conn is conn.raw
assert not conn.echo
async def test_context_manager(create_pool):
pool = await create_pool(minsize=10)
with (await pool) as conn:
assert isinstance(conn, Connection)
assert 9 == pool.freesize
assert {conn} == pool._used
assert 10 == pool.freesize
yield from self.execute(operation)
if self.rowcount != 1:
raise ValueError("operation return not exactly 1 row")
ret = yield from self.fetchone()
if isinstance(ret, Sequence):
assert len(ret) == 1, "Bad SQL {!r}".format(operation)
return ret[0]
elif isinstance(ret, Mapping):
assert len(ret) == 1, "Bad SQL {!r}".format(operation)
return ret[next(iter(ret))] # return the value for single key
else:
raise ValueError("the result of SQL execution is "
"something terrible ({!r})")
class SAConnection(Connection):
dialect = dialect
def __init__(self, dsn, loop, waiter, **kwargs):
super().__init__(dsn, loop, waiter, **kwargs)
@asyncio.coroutine
def cursor(self, name=None, cursor_factory=None,
scrollable=None, withhold=False, dialect=dialect):
impl = yield from self._cursor(name=name,
cursor_factory=cursor_factory,
scrollable=scrollable,
withhold=withhold)
return SACursor(self, impl, dialect)
async def _connect(dsn=None, *, timeout=TIMEOUT, loop=None, enable_json=True,
enable_hstore=True, enable_uuid=True, echo=False, **kwargs):
if loop is None:
loop = asyncio.get_event_loop()
waiter = create_future(loop)
conn = Connection(dsn, loop, timeout, waiter, bool(echo), **kwargs)
try:
await conn._poll(waiter, timeout)
except Exception:
conn.close()
raise
if enable_json:
extras.register_default_json(conn._conn)
if enable_uuid:
extras.register_uuid(conn_or_curs=conn._conn)
if enable_hstore:
oids = await _enable_hstore(conn)
if oids is not None:
oid, array_oid = oids
extras.register_hstore(conn._conn, oid=oid, array_oid=array_oid)
return conn