Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def connect_db(instance, cursorclass=pymysql.cursors.Cursor):
"""Yields a cursor on a new connection to a database."""
conn = pymysql.connect(
user=args.master_user_name,
password=args.master_user_password,
host=instance['Endpoint']['Address'],
port=instance['Endpoint']['Port'],
autocommit=True,
cursorclass=cursorclass)
with closing(conn):
cursor = conn.cursor()
with closing(cursor):
yield cursor
@setdocstring(pymysql.cursors.Cursor.execute)
def execute(self, query, args=None):
if args is None:
args = ()
elif not isinstance(args, (tuple, list, dict)):
args = (args,)
result = 0
result = self._query(query, args)
self._executed = query
return result
ws = wb.create_sheet()
ws.title = self.title
# 获取列名作为标题
self.conn.cursorclass = pymysql.cursors.DictCursor
with self.conn.cursor() as cursor:
cursor.execute(self.sql)
title = []
for column_name in cursor.fetchone():
title.append(column_name)
ws.append(title)
# 获取数据,并写入到表格
if self.affected_row <= 100000:
# 当导出数据量小于10W时,使用fetchall直接读取到内存中
self.conn.cursorclass = pymysql.cursors.Cursor
with self.conn.cursor() as cursor:
msg = f'正在导出SQL:{self.sql}'
self.pull_msg(msg)
self.execute_log.append(msg)
cursor.execute(self.sql)
rows = cursor.fetchall()
msg = f'正在处理数据'
self.pull_msg(msg)
self.execute_log.append(msg)
for row in rows:
# 过滤掉特殊字符
filter_illegal_characters_row = list(
map(
def _conn(cursorclass=pymysql.cursors.Cursor):
return pymysql.connect(host=conf['MYSQL_LOCAL_HOST'],
user=conf['MYSQL_USER'],
password=conf['MYSQL_PASSWORD'],
db=conf['MYSQL_DB_NAME'],
charset='utf8mb4',
cursorclass=cursorclass)
def get_conn(cursorclass=pymysql.cursors.Cursor):
with app.app_context():
if not hasattr(g, 'db_conn'):
g.db_conn = _conn(cursorclass)
return g.db_conn
def connect_db(self, database: str, return_type: str):
''' connect database '''
cursorclass = pymysql.cursors.DictCursor if return_type == 'dict' else pymysql.cursors.Cursor
try:
self.db = pymysql.connect(host=self.mysql_host,
user=self.mysql_user,
password=self.mysql_pw,
db=database,
charset=self.mysql_char,
cursorclass=cursorclass)
except pymysql.OperationalError:
echo(0, 'Please change mysql info in util/db.ini!!!')
self.db = False
except pymysql.InternalError:
echo(2, 'Try to create database in mysql.........')
if self.create_db(database):
self.connect_db(database, return_type)
else:
self.db = False
:param max_pool_size: maximum connection pool size (max pool size can be changed dynamically)
:param enable_auto_resize: if set to True, the max_pool_size will be changed dynamically
:param pool_resize_boundary: !!this is related to the max connections of your mysql server!!
:param auto_resize_scale: `max_pool_size * auto_resize_scale` is the new max_pool_size.
The max_pool_size will be changed dynamically only if `enable_auto_resize` is True.
:param defer_connect_pool: don't connect to pool on construction, wait for explicit call. Default is False.
:param kwargs: other keyword arguments to be passed to `pymysql.Connection`
"""
# config for a database connection
self._host = host
self._user = user
self._password = password
self._database = database
self._port = port
self._charset = charset
self._cursor_class = DictCursor if use_dict_cursor else Cursor
self._other_kwargs = kwargs
# config for the connection pool
self._pool_name = pool_name
self._max_pool_size = max_pool_size if max_pool_size < pool_resize_boundary else pool_resize_boundary
# self._step_size = step_size
self._enable_auto_resize = enable_auto_resize
self._pool_resize_boundary = pool_resize_boundary
if auto_resize_scale < 1:
raise ValueError(
"Invalid scale {}, must be bigger than 1".format(auto_resize_scale))
self._auto_resize_scale = int(round(auto_resize_scale, 0))
# self.wait_timeout = wait_timeout
self._pool_container = PoolContainer(self._max_pool_size)
"""Connection and cursor wrappers around PyMySQL.
This module effectively backports PyMySQL functionality and error handling
so that mycli will support Debian's python-pymysql version (0.6.2).
"""
import pymysql
Cursor = pymysql.cursors.Cursor
connect = pymysql.connect
if pymysql.VERSION[1] == 6 and pymysql.VERSION[2] < 5:
class Cursor(pymysql.cursors.Cursor):
"""Makes Cursor a context manager in PyMySQL < 0.6.5."""
def __enter__(self):
return self
def __exit__(self, *exc_info):
del exc_info
self.close()
if pymysql.VERSION[1] == 6 and pymysql.VERSION[2] < 3:
def connect(self):
"""
Returns a MySQL connection
:return: pymysql Connection class
"""
import pymysql
connection_class = self._get_connection_class()
return connection_class(host=self.host, port=self.port, db=self.database,
user=self.user, password=self.password,
charset=self.charset, cursorclass=pymysql.cursors.Cursor)
:type where: dict
:type insert: bool
:param insert: If insert==True, insert the input condition if there's no result and return the id of new row.
:type ifnone: string
:param ifnone: When ifnone is a non-empty string, raise an error if query returns empty result. insert parameter
would not work in this mode.
"""
select_result = self.select(table=table, columns=[column], join=join, where=where, limit=1)
if self.debug:
return select_result
result = select_result[0] if select_result else None
if result:
return result[0 if self.cursorclass is pymysql.cursors.Cursor else column]
if ifnone:
raise ValueError(ifnone)
if insert:
if any([isinstance(d, dict) for d in where.values()]):
raise ValueError("The where parameter in get() doesn't support nested condition with insert==True.")
return self.insert(table=table, value=where)
return None