Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_timeout(engine):
assert TIMEOUT == engine.timeout
async def test_execute_override_timeout(connect):
timeout = 0.1
conn = await connect()
cur = await conn.cursor()
assert TIMEOUT == cur.timeout
t1 = time.time()
with pytest.raises(asyncio.TimeoutError):
await cur.execute("SELECT pg_sleep(1)", timeout=timeout)
t2 = time.time()
dt = t2 - t1
assert 0.08 <= dt <= 0.15, dt
async def test_override_cursor_timeout(connect):
timeout = 0.1
conn = await connect()
assert TIMEOUT == conn.timeout
cur = await conn.cursor(timeout=timeout)
assert timeout == cur.timeout
t1 = time.time()
with pytest.raises(asyncio.TimeoutError):
await cur.execute("SELECT pg_sleep(1)")
t2 = time.time()
dt = t2 - t1
assert 0.08 <= dt <= 0.15, dt
async def test_create_pool(create_pool):
pool = await create_pool()
assert isinstance(pool, Pool)
assert 1 == pool.minsize
assert 10 == pool.maxsize
assert 1 == pool.size
assert 1 == pool.freesize
assert TIMEOUT == pool.timeout
assert not pool.echo