Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def start(self):
await super().start()
LOG.info('Starting engines...')
# Add your custom engines here, example with PostgreSQL:
self.engines['pg'] = self.loop.create_task(aiopg.create_pool(host=self.config['engines']['pg']['host'],
port=int(self.config['engines']['pg']['port']),
sslmode='disable',
dbname=self.config['engines']['pg']['dbname'],
user=self.config['engines']['pg']['user'],
password=self.config['engines']['pg']['password'],
cursor_factory=psycopg2.extras.RealDictCursor,
minsize=int(self.config['engines']['pg']['minsize']),
maxsize=int(self.config['engines']['pg']['maxsize'])))
await asyncio.wait([self.engines['pg']], return_when=asyncio.ALL_COMPLETED)
LOG.info('All engines ready !')
host = os.environ.get("MF_METADATA_DB_HOST", "localhost")
port = os.environ.get("MF_METADATA_DB_PORT", 5432)
user = os.environ.get("MF_METADATA_DB_USER", "postgres")
password = os.environ.get("MF_METADATA_DB_PSWD", "postgres")
database_name = os.environ.get("MF_METADATA_DB_NAME", "postgres")
dsn = "dbname={0} user={1} password={2} host={3} port={4}".format(
database_name, user, password, host, port
)
# todo make poolsize min and max configurable as well as timeout
# todo add retry and better error message
retries = 3
for i in range(retries):
while True:
try:
self.pool = await aiopg.create_pool(dsn)
for table in self.tables:
await table._init()
except Exception as e:
if retries - i < 1:
raise e
time.sleep(1)
continue
break
async def go():
pool = await aiopg.create_pool(dsn)
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute("SELECT 1")
ret = []
async def get_pool():
return await aiopg.create_pool(connection)
async def main():
pool = await aiopg.create_pool(dsn)
async with pool.cursor() as cur:
await transaction(cur, IsolationLevel.repeatable_read)
await transaction(cur, IsolationLevel.read_committed)
await transaction(cur, IsolationLevel.serializable)
cur.execute('select * from tbl')
def start(self):
yield from super().start()
LOG.info('Starting engines...')
print('Starting engines...')
self.engines['pg'] = self.loop.create_task(aiopg.create_pool(host='tfb-database',
port=int(self.config['engines']['pg']['port']),
sslmode='disable',
dbname=self.config['engines']['pg']['dbname'],
user=self.config['engines']['pg']['user'],
password=self.config['engines']['pg']['password'],
cursor_factory=psycopg2.extras.RealDictCursor,
minsize=int(self.config['engines']['pg']['minsize']),
maxsize=int(self.config['engines']['pg']['maxsize']),
loop=self.loop))
self.engines['mysql'] = self.loop.create_task(aiomysql.create_pool(
host=self.config['engines']['mysql']['host'],
port=self.config['engines']['mysql']['port'],
user=self.config['engines']['mysql']['user'],
password=self.config['engines']['mysql']['pwd'],
db=self.config['engines']['mysql']['db'],
minsize=int(self.config['engines']['mysql']['minsize']),
async def _create_engine(dsn=None, *, minsize=1, maxsize=10, loop=None,
dialect=_dialect, timeout=TIMEOUT, pool_recycle=-1,
**kwargs):
if loop is None:
loop = asyncio.get_event_loop()
pool = await aiopg.create_pool(dsn, minsize=minsize, maxsize=maxsize,
loop=loop, timeout=timeout,
pool_recycle=pool_recycle, **kwargs)
conn = await pool.acquire()
try:
real_dsn = conn.dsn
return Engine(dialect, pool, real_dsn)
finally:
await pool.release(conn)
"""
# install jinja2 templates
loader = jinja2.DictLoader({'index.html': template})
aiohttp_jinja2.setup(app, loader=loader)
# init routes for index page, and page with error
app.router.add_route('GET', '/', basic_handler, name='index')
app.router.add_route('GET', '/exc', exception_handler, name='exc_example')
if 'aiopg' in sys.modules:
# create connection to the database
dsn = 'host={host} dbname={db} user={user} password={passw} '.format(
db='postgres', user='developer', passw='1', host='localhost')
app['db'] = yield from aiopg.create_pool(
dsn, loop=loop, minsize=1, maxsize=2)
# Correct PostgreSQL shutdown
app.on_cleanup.append(close_pg)
if 'aioredis' in sys.modules:
# create redis pool
app['redis'] = yield from create_pool(('127.0.0.1', '6379'))
# Correct Redis shutdown
app.on_cleanup.append(close_redis)
handler = app.make_handler()
srv = yield from loop.create_server(handler, '127.0.0.1', 9000)
print("Server started at http://127.0.0.1:9000")
return srv, handler
async def connect(self):
"""建立连接.
Returns:
(aiopg.pool.Pool): è¿”å›žæ± å¯¹è±¡
"""
if self.password:
dsn = f"dbname={self.database} user={self.username} password={self.password} host={self.host} port={self.port}"
else:
dsn = f"dbname={self.database} user={self.username} host={self.host} port={self.port}"
self._pool = await aiopg.create_pool(
dsn=dsn,
loop=self.loop,
**self.kwargs
)
return self._pool