Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# Copyright (C) 2016-present the asyncpg authors and contributors
#
#
# This module is part of asyncpg and is released under
# the Apache 2.0 License: http://www.apache.org/licenses/LICENSE-2.0
import asyncio
from asyncpg import _testbase as tb
from asyncpg import exceptions
class TestListeners(tb.ClusterTestCase):
async def test_listen_01(self):
async with self.create_pool(database='postgres') as pool:
async with pool.acquire() as con:
q1 = asyncio.Queue()
q2 = asyncio.Queue()
def listener1(*args):
q1.put_nowait(args)
def listener2(*args):
q2.put_nowait(args)
await con.add_listener('test', listener1)
await con.add_listener('test', listener2)
await self.con.execute(alter_password.format(password))
# test to see that passwords are properly SASL prepped
conn = await self.connect(
user='scram_sha_256_user', password=password)
await conn.close()
alter_password = \
"ALTER ROLE scram_sha_256_user PASSWORD 'correctpassword';"
await self.con.execute(alter_password)
await self.con.execute("SET password_encryption = 'md5';")
async def test_auth_unsupported(self):
pass
class TestConnectParams(tb.TestCase):
TESTS = [
{
'env': {
'PGUSER': 'user',
'PGDATABASE': 'testdb',
'PGPASSWORD': 'passw',
'PGHOST': 'host',
'PGPORT': '123'
},
'result': ([('host', 123)], {
'user': 'user',
'password': 'passw',
'database': 'testdb'})
},
#
#
# This module is part of asyncpg and is released under
# the Apache 2.0 License: http://www.apache.org/licenses/LICENSE-2.0
import os
import unittest
import asyncpg
import asyncpg.serverversion
from asyncpg import _testbase as tb
class TestEnvironment(tb.ConnectedTestCase):
@unittest.skipIf(not os.environ.get('PGVERSION'),
"environ[PGVERSION] is not set")
async def test_environment_server_version(self):
pgver = os.environ.get('PGVERSION')
env_ver = asyncpg.serverversion.split_server_version_string(pgver)
srv_ver = self.con.get_server_version()
self.assertEqual(
env_ver[:2], srv_ver[:2],
'Expecting PostgreSQL version {pgver}, got {maj}.{min}.'.format(
pgver=pgver, maj=srv_ver.major, min=srv_ver.minor)
)
@unittest.skipIf(not os.environ.get('ASYNCPG_VERSION'),
"environ[ASYNCPG_VERSION] is not set")
async def test_environment_asyncpg_version(self):
# Copyright (C) 2016-present the ayncpg authors and contributors
#
#
# This module is part of asyncpg and is released under
# the Apache 2.0 License: http://www.apache.org/licenses/LICENSE-2.0
import datetime
from asyncpg import utils
from asyncpg import _testbase as tb
class TestUtils(tb.ConnectedTestCase):
async def test_mogrify_simple(self):
cases = [
('timestamp',
datetime.datetime(2016, 10, 10),
"SELECT '2016-10-10 00:00:00'::timestamp"),
('int[]',
[[1, 2], [3, 4]],
"SELECT '{{1,2},{3,4}}'::int[]"),
]
for typename, data, expected in cases:
with self.subTest(value=data, type=typename):
mogrified = await utils._mogrify(
self.con, 'SELECT $1::{}'.format(typename), [data])
self.assertEqual(mogrified, expected)
#
#
# This module is part of asyncpg and is released under
# the Apache 2.0 License: http://www.apache.org/licenses/LICENSE-2.0
import asyncio
import asyncpg
import gc
import unittest
from asyncpg import _testbase as tb
from asyncpg import exceptions
class TestPrepare(tb.ConnectedTestCase):
async def test_prepare_01(self):
self.assertEqual(self.con._protocol.queries_count, 0)
st = await self.con.prepare('SELECT 1 = $1 AS test')
self.assertEqual(self.con._protocol.queries_count, 0)
self.assertEqual(st.get_query(), 'SELECT 1 = $1 AS test')
rec = await st.fetchrow(1)
self.assertEqual(self.con._protocol.queries_count, 1)
self.assertTrue(rec['test'])
self.assertEqual(len(rec), 1)
self.assertEqual(False, await st.fetchval(10))
self.assertEqual(self.con._protocol.queries_count, 2)
async def test_prepare_02(self):
@tb.with_connection_options(statement_cache_size=0)
async def test_introspection_no_stmt_cache_01(self):
old_uid = apg_con._uid
self.assertEqual(self.con._stmt_cache.get_max_size(), 0)
await self.con.fetchval('SELECT $1::int[]', [1, 2])
await self.con.execute('''
CREATE EXTENSION IF NOT EXISTS hstore
''')
try:
await self.con.set_builtin_type_codec(
'hstore', codec_name='pg_contrib.hstore')
finally:
await self.con.execute('''
DROP EXTENSION hstore
# Copyright (C) 2016-present the asyncpg authors and contributors
#
#
# This module is part of asyncpg and is released under
# the Apache 2.0 License: http://www.apache.org/licenses/LICENSE-2.0
import asyncio
import asyncpg
from asyncpg import _testbase as tb
class TestCancellation(tb.ConnectedTestCase):
async def test_cancellation_01(self):
st1000 = await self.con.prepare('SELECT 1000')
async def test0():
val = await self.con.execute('SELECT 42')
self.assertEqual(val, 'SELECT 1')
async def test1():
val = await self.con.fetchval('SELECT 42')
self.assertEqual(val, 42)
async def test2():
val = await self.con.fetchrow('SELECT 42')
self.assertEqual(val, (42,))