Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def connect(self, host='localhost', port=3306, user='root',
password='', db='faf_test', minsize=1, maxsize=1):
if self.engine is not None:
raise ValueError("DB is already connected!")
self.engine = await create_engine(
host=host,
port=port,
user=user,
password=password,
db=db,
autocommit=False,
loop=self._loop,
minsize=minsize,
maxsize=maxsize,
echo=True
)
self._keep = self._loop.create_task(self._keep_connection())
await self._conn_present.wait()
async def test_sa_transaction(table, mysql_params, loop):
async with sa.create_engine(loop=loop, **mysql_params) as engine:
async with engine.acquire() as connection:
cnt = await connection.scalar(tbl.count())
assert 3 == cnt
async with (await connection.begin()) as tr:
assert tr.is_active
await connection.execute(tbl.delete())
assert not tr.is_active
cnt = await connection.scalar(tbl.count())
assert 0 == cnt
:param mysql: {'host': str, 'user': str, 'password': str, 'db': str,
'loop': asyncio.BaseEventLoop}
"""
xchg_tz = await broker.hist_data_req_timezone(req)
# All data will be downloaded from broker if database is unavailable
# or requested BarSize not in database.
if mysql is None or timedur_standardize(req.BarSize)[-1] is 's':
blk_list = await broker.req_hist_data_async(req)
blk = blk_list[0]
blk.tz_convert(xchg_tz)
return blk
# init database
engine = await aio_create_engine(
host=mysql['host'], user=mysql['user'], password=mysql['password'],
db=mysql['db'], loop=mysql['loop'])
# Query database first, and split req for downloading
(dl_reqs, insert_limit, blk_ret,
start_dt, end_dt) = await query_hist_data_split_req(req, xchg_tz, engine)
_logger.debug('blk_ret head:\n%s', blk_ret.df.iloc[:3])
_logger.debug('start_dt: %s', start_dt)
_logger.debug('end_dt: %s', end_dt)
# Download data and insert to db concurrently
if dl_reqs is not None:
blk_dl_list = await asyncio.gather(*(
download_insert_hist_data(req_i, broker, engine, inslim)
for req_i, inslim in zip(dl_reqs, insert_limit)))
for blk_dl in blk_dl_list:
async def connect(self, host='localhost', port=3306, user='root',
password='', db='faf_test', minsize=1, maxsize=1):
if self.engine is not None:
raise ValueError("DB is already connected!")
self.engine = await create_engine(
host=host,
port=port,
user=user,
password=password,
db=db,
autocommit=True,
loop=self._loop,
minsize=minsize,
maxsize=maxsize,
)
def go():
engine = yield from create_engine(user='root',
db='test_pymysql',
host='127.0.0.1',
password='')
yield from create_table(engine)
with (yield from engine) as conn:
yield from conn.execute(tbl.insert().values(val='abc'))
res = yield from conn.execute(tbl.select())
for row in res:
print(row.id, row.val)
engine.close()
yield from engine.wait_closed()