Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_cached_sha256_nopw(mysql_server, loop):
connection_data = copy.copy(mysql_server['conn_params'])
connection_data['user'] = 'nopass_caching_sha2'
connection_data['password'] = None
async with create_pool(**connection_data,
loop=loop) as pool:
async with pool.get() as conn:
# User doesnt have any permissions to look at DBs
# But as 8.0 will default to caching_sha2_password
assert conn._auth_plugin_used == 'caching_sha2_password'
async def init(cls, loop, **kwargs):
logging.info('aiomysql.create_pool start')
global dbPool
dbPool = await aiomysql.create_pool(
host=kwargs.get('host', 'localhost'),
port=kwargs.get('port', 3306),
user=kwargs['user'],
password=kwargs['password'],
db=kwargs['db'],
charset=kwargs.get('charset', 'utf8'),
autocommit=kwargs.get('autocommit', True),
maxsize=kwargs.get('maxsize', 10),
minsize=kwargs.get('minsize', 1),
loop=loop
)
logging.info('aiomysql.create_pool end')
return dbPool
def start(self):
yield from super().start()
LOG.info('Starting engines...')
print('Starting engines...')
self.engines['pg'] = self.loop.create_task(aiopg.create_pool(host='tfb-database',
port=int(self.config['engines']['pg']['port']),
sslmode='disable',
dbname=self.config['engines']['pg']['dbname'],
user=self.config['engines']['pg']['user'],
password=self.config['engines']['pg']['password'],
cursor_factory=psycopg2.extras.RealDictCursor,
minsize=int(self.config['engines']['pg']['minsize']),
maxsize=int(self.config['engines']['pg']['maxsize']),
loop=self.loop))
self.engines['mysql'] = self.loop.create_task(aiomysql.create_pool(
host=self.config['engines']['mysql']['host'],
port=self.config['engines']['mysql']['port'],
user=self.config['engines']['mysql']['user'],
password=self.config['engines']['mysql']['pwd'],
db=self.config['engines']['mysql']['db'],
minsize=int(self.config['engines']['mysql']['minsize']),
maxsize=int(self.config['engines']['mysql']['maxsize']),
cursorclass=aiomysql.DictCursor,
charset='utf8',
use_unicode=True,
loop=self.loop))
yield from asyncio.wait([self.engines['pg']], return_when=asyncio.ALL_COMPLETED)
LOG.info('All engines ready !')
async def get_mysql_pool_cli(self):
"""
:return: an async mysql client
"""
if self.mysql_pool_cli is None:
self.mysql_pool_cli = await aiomysql.create_pool(host=self.host, port=self.port, user=self.user,
password=self.password, db=self.database, loop=self.loop,
minsize=1, maxsize=3, charset=self.charset)
self.connection = await self.mysql_pool_cli.acquire()
self.cursor = await self.connection.cursor()
return self.mysql_pool_cli
async def create_pool(loop, **kw):
# 负责创建一个全局的数据连接对象
logger.info('create database connection pool...')
global __pool
__pool = await aiomysql.create_pool(
host=kw.get('host', 'localhost'),
port=kw.get('port', 3306),
user=kw['user'],
password=kw['password'],
db=kw['database'],
charset=kw.get('charset', 'utf8'),
autocommit=kw.get('autocommit', True),
maxsize=kw.get('maxsize', 10),
minsize=kw.get('minsize', 1),
loop = loop
)
async def connect(self) -> None:
assert self._pool is None, "DatabaseBackend is already running"
kwargs = self._get_connection_kwargs()
self._pool = await aiomysql.create_pool(
host=self._database_url.hostname,
port=self._database_url.port or 3306,
user=self._database_url.username or getpass.getuser(),
password=self._database_url.password,
db=self._database_url.database,
autocommit=True,
**kwargs,
)
def test_example():
pool = yield from aiomysql.create_pool(host='127.0.0.1', port=3306,
user='root', password='',
db='mysql', loop=loop)
with (yield from pool) as conn:
cur = yield from conn.cursor()
yield from cur.execute("SELECT 10")
# print(cur.description)
(r,) = yield from cur.fetchone()
assert r == 10
pool.close()
yield from pool.wait_closed()
async def __init__(self, config_file):
with open(config_file, 'r') as f:
config = json.loads(f.read().strip())
self.host = config['host']
self.port = config['port']
self.user = config['user']
self.db = config['db']
self.password = config['password']
self.charset = 'utf8'
self.pool = await aiomysql.create_pool(host=self.host,
port=self.port,
db=self.db,
user=self.user,
password=self.password,
charset=self.charset,
cursorclass=aiomysql.cursors.DictCursor,
autocommit=True)