Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_incompatible_cursor_fails(loop, mysql_params):
mysql_params['cursorclass'] = DictCursor
with pytest.raises(sa.ArgumentError) as ctx:
await sa.create_engine(loop=loop, **mysql_params)
msg = 'SQLAlchemy engine does not support this cursor class'
assert str(ctx.value) == msg
async def execute(sql, args, autocommit = True):
log(sql)
with (await __pool) as conn:
if not autocommit:
await conn.begin()
try:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(sql.replace('?', '%s'), args)
affected = cur.rowcount
if not autocommit:
await conn.commit()
except BaseException as e:
if not autocommit:
await conn.rollback()
raise
return affected
async def execute(sql, args, autocommit=True):
log(sql)
async with __pool.get() as conn:
if not autocommit:
await conn.begin()
try:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(sql.replace('?', '%s'), args)
affected = cur.rowcount
if autocommit:
await conn.commit()
logger.info('commit success!')
except BaseException as e:
if not autocommit:
await conn.rollback()
raise
return affected
async def submit(cls,sql, args=None, autocommit=True):
async with cls.db_con_pool.get() as conn:
if not autocommit:
await conn.begin()
try:
async with conn.cursor(aiomysql.DictCursor) as cur:
if args:
await cur.execute(sql, args)
else:
await cur.execute(sql)
cls.__affected__ = cur.rowcount
last_id = cur.lastrowid
if not autocommit:
await conn.commit()
except BaseException as e:
if not autocommit:
await conn.rollback()
exc_type, exc_value, _ = sys.exc_info()
raise exc_type(exc_value)
return last_id
async def select(cls, sql, args=(), size=None):
uid = uuid.uuid4().hex
logging.info("uid:%s,DBPoolC.select get conn start " % (uid,))
with (await dbPool) as conn:
logging.info("uid:%s,DBPoolC.select get conn end %s " % (uid, conn))
logging.info("uid:%s,DBPoolC.select get cursor start " % (uid,))
cur = await conn.cursor(aiomysql.DictCursor)
logging.info("uid:%s,DBPoolC.select get cursor end %s " % (uid, cur))
sql = sql.replace('?', '%s')
logging.info("uid:%s,DBPoolC.select execute start " % (uid,))
await cur.execute(sql, args)
logging.info("uid:%s,DBPoolC.select execute end " % (uid,))
if size:
logging.info("uid:%s,DBPoolC.select fetchmany start " % (uid,))
rs = await cur.fetchmany(size)
logging.info("uid:%s,DBPoolC.select fetchmany end " % (uid,))
else:
logging.info("uid:%s,DBPoolC.select fetchall start " % (uid,))
rs = await cur.fetchall()
logging.info("uid:%s,DBPoolC.select fetchall end " % (uid,))
await cur.close()
return rs
async def select(sql, args, size=None):
global __pool
with (await __pool) as conn:
cur = await conn.cursor(aiomysql.DictCursor)
await cur.execute(sql.replace('?', '%s'), args)
if size:
rs = await cur.fetchmany(size)
else:
rs = await cur.fetchall()
await cur.close()
return rs
async def select(cls, sql, args=None, rows=None):
async with cls.db_con_pool.get() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
try:
await cur.execute(sql, args or ())
if rows:
rs = await cur.fetchmany(rows)
else:
rs = await cur.fetchall()
return rs
except:
exc_type, exc_value, _ = sys.exc_info()
raise exc_type(exc_value)
async def select(sql,args,size=None):
log(sql,args)
global __pool
async with __pool.get() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(sql.replace('?','%s'),args or ())
if size:
rs = await cur.fetchmany(size)
else:
rs = await cur.fetchall()
logging.info('row returned:%s' % len(rs))
return rs
async def db_connect(loop):
while True:
try:
return await aiomysql.create_pool(
host='mysql',
user='dbuser', password='zJ2plyhR9', db='cpanel',
charset='utf8mb4',
cursorclass=aiomysql.DictCursor,
loop=loop
)
except Exception:
sleep(10)
def _connect_to_ctl(self):
self._ctl_connection_settings = dict(self._connection_settings)
self._ctl_connection_settings["db"] = "information_schema"
self._ctl_connection_settings["cursorclass"] = aiomysql.DictCursor
self._ctl_connection = yield from aiomysql.connect(
**self._ctl_connection_settings)
self._ctl_connection._get_table_information = \
self._get_table_information
self._connected_ctl = True