Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import asyncio
import aiomysql
from tests import base
from tests._testutils import run_until_complete
class TestBulkInserts(base.AIOPyMySQLTestCase):
cursor_type = aiomysql.cursors.DictCursor
def setUp(self):
super(TestBulkInserts, self).setUp()
@asyncio.coroutine
def prepare(self):
# create a table ane some data to query
self.conn = conn = self.connections[0]
c = yield from conn.cursor(self.cursor_type)
yield from c.execute("drop table if exists bulkinsert;")
yield from c.execute(
"""CREATE TABLE bulkinsert
(
id int(11),
name char(20),
age int,
import datetime
import aiomysql.cursors
from tests import base
from tests._testutils import run_until_complete
class TestDictCursor(base.AIOPyMySQLTestCase):
bob = {'name': 'bob', 'age': 21,
'DOB': datetime.datetime(1990, 2, 6, 23, 4, 56)}
jim = {'name': 'jim', 'age': 56,
'DOB': datetime.datetime(1955, 5, 9, 13, 12, 45)}
fred = {'name': 'fred', 'age': 100,
'DOB': datetime.datetime(1911, 9, 12, 1, 1, 1)}
cursor_type = aiomysql.cursors.DictCursor
def setUp(self):
super(TestDictCursor, self).setUp()
self.conn = conn = self.connections[0]
@asyncio.coroutine
def prepare():
c = yield from conn.cursor(self.cursor_type)
# create a table ane some data to query
yield from c.execute("drop table if exists dictcursor")
yield from c.execute(
"""CREATE TABLE dictcursor (name char(20), age int ,
DOB datetime)""")
data = [("bob", 21, "1990-02-06 23:04:56"),
("jim", 56, "1955-05-09 13:12:45"),
def test_issue_79(self):
""" Duplicate field overwrites the previous one in the result
of DictCursor """
conn = self.connections[0]
c = yield from conn.cursor(aiomysql.cursors.DictCursor)
yield from c.execute("drop table if exists a")
yield from c.execute("drop table if exists b")
yield from c.execute("""CREATE TABLE a (id int, value int)""")
yield from c.execute("""CREATE TABLE b (id int, value int)""")
a = (1, 11)
b = (1, 22)
try:
yield from c.execute("insert into a values (%s, %s)", a)
yield from c.execute("insert into b values (%s, %s)", b)
yield from c.execute("SELECT * FROM a inner join b on a.id = b.id")
r, *_ = yield from c.fetchall()
self.assertEqual(r['id'], 1)
self.assertEqual(r['value'], 11)
async def init(self):
loop = asyncio.get_event_loop()
self.sql=self.args[0]
self.pool=await create_pool(host=self.kwargs.host, port=self.kwargs.port,
user=self.kwargs.user, password=self.kwargs.password,
db=self.kwargs.db, loop=loop,cursorclass=cursors.DictCursor)
self.inited=True
session = aiohttp.ClientSession(
raise_for_status=True,
timeout=aiohttp.ClientTimeout(total=60))
app['client_session'] = session
app['github_client'] = gh_aiohttp.GitHubAPI(session, 'ci', oauth_token=oauth_token)
app['batch_client'] = await BatchClient('ci', session=session)
with open('/ci-user-secret/sql-config.json', 'r') as f:
config = json.loads(f.read().strip())
app['dbpool'] = await aiomysql.create_pool(host=config['host'],
port=config['port'],
db=config['db'],
user=config['user'],
password=config['password'],
charset='utf8',
cursorclass=aiomysql.cursors.DictCursor,
autocommit=True)
asyncio.ensure_future(update_loop(app))
async def create_database_pool(config_file=None, autocommit=True, maxsize=10):
if config_file is None:
config_file = os.environ.get('HAIL_DATABASE_CONFIG_FILE', '/sql-config/sql-config.json')
with open(config_file, 'r') as f:
sql_config = json.loads(f.read())
return await aiomysql.create_pool(
maxsize=maxsize,
# connection args
host=sql_config['host'], user=sql_config['user'], password=sql_config['password'],
db=sql_config.get('db'), port=sql_config['port'], charset='utf8',
cursorclass=aiomysql.cursors.DictCursor, autocommit=autocommit)
config = json.loads(f.read().strip())
self.host = config['host']
self.port = config['port']
self.user = config['user']
self.db = config['db']
self.password = config['password']
self.charset = 'utf8'
self.pool = await aiomysql.create_pool(host=self.host,
port=self.port,
db=self.db,
user=self.user,
password=self.password,
charset=self.charset,
cursorclass=aiomysql.cursors.DictCursor,
autocommit=True)
'host': host,
'db': database,
'user': user,
'password': password,
'minsize': minsize,
'maxsize': maxsize,
'charset': charset,
'loop': loop,
'autocommit': autocommit,
'pool_recycle': pool_recycle,
}
self.sanic = sanic
if sanic:
sanic.db = self
if return_dict:
self.db_args['cursorclass']=aiomysql.cursors.DictCursor
if kwargs:
self.db_args.update(kwargs)
self.pool = None