Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def go():
conn = await self.connect()
res = await conn.execute("SELECT * FROM sa_tbl;")
self.assertIsInstance(res.cursor, Cursor)
self.assertEqual(('id', 'name'), res.keys())
rows = await res.fetchall()
self.assertTrue(res.closed)
self.assertIsNone(res.cursor)
self.assertEqual(1, len(rows))
row = rows[0]
self.assertEqual(1, row[0])
self.assertEqual(1, row['id'])
self.assertEqual(1, row.id)
self.assertEqual('first', row[1])
self.assertEqual('first', row['name'])
self.assertEqual('first', row.name)
# TODO: fix this
await conn._connection.commit()
self.loop.run_until_complete(go())
async def go():
conn = await self.connect()
res = await conn.execute(tbl.select())
self.assertIsInstance(res.cursor, Cursor)
self.assertEqual(('id', 'name'), res.keys())
rows = await res.fetchall()
self.assertTrue(res.closed)
self.assertIsNone(res.cursor)
self.assertTrue(res.returns_rows)
self.assertEqual(1, len(rows))
row = rows[0]
self.assertEqual(1, row[0])
self.assertEqual(1, row['id'])
self.assertEqual(1, row.id)
self.assertEqual('first', row[1])
self.assertEqual('first', row['name'])
self.assertEqual('first', row.name)
# TODO: fix this
await conn._connection.commit()
async def test_compatible_cursor_correct(loop, mysql_params):
class SubCursor(Cursor):
pass
mysql_params['cursorclass'] = SubCursor
async with sa.create_engine(loop=loop, **mysql_params) as engine:
async with engine.acquire() as conn:
# check not raise sa.ArgumentError exception
pass
assert conn.closed
def test_custom_cursor(self):
class MyCursor(Cursor):
pass
conn = self.connections[0]
cur = yield from conn.cursor(MyCursor)
self.assertIsInstance(cur, MyCursor)
yield from cur.execute("SELECT 42;")
(r, ) = yield from cur.fetchone()
self.assertEqual(r, 42)
async def cursor(self, conn=None) -> aiomysql.Cursor:
in_transaction = conn is not None
if not conn:
if not self._conn_pool:
await self.reconnect()
conn = await self._conn_pool.acquire()
cursor = await conn.cursor()
cursor.release = functools.partial(self.release_cursor, cursor, in_transaction=in_transaction)
return cursor
def __init__(self):
import aiomysql
for item in aiomysql.__all__:
setattr(self, item, getattr(aiomysql, item))
self.cursor_cls = aiomysql.Cursor
self.ss_cursor_cls = aiomysql.SSCursor
import aiomysql
from server.decorators import with_logger
@with_logger
class LoggingCursor(aiomysql.Cursor):
"""
Allows use of cursors using the ``with'' context manager statement.
"""
def __init__(self, connection, echo=False):
super().__init__(connection, echo)
async def execute(self, query, args=None):
self._logger.debug("Executing query: %s with args: %s", query, args)
return await super().execute(query, args)
@property
def size(self):
return self.rowcount