Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def connect(self):
"""
Connects to the database server.
"""
ssl_input = self.conn_info.pop('ssl_input')
with warnings.catch_warnings():
warnings.filterwarnings('ignore', '.*deprecated.*')
try:
self._conn = client.connect(
init_command=self.init_fun,
sql_mode="NO_ZERO_DATE,NO_ZERO_IN_DATE,ERROR_FOR_DIVISION_BY_ZERO,"
"STRICT_ALL_TABLES,NO_ENGINE_SUBSTITUTION",
charset=config['connection.charset'],
**self.conn_info)
except client.err.InternalError:
if ssl_input is None:
self.conn_info.pop('ssl')
self._conn = client.connect(
init_command=self.init_fun,
sql_mode="NO_ZERO_DATE,NO_ZERO_IN_DATE,ERROR_FOR_DIVISION_BY_ZERO,"
"STRICT_ALL_TABLES,NO_ENGINE_SUBSTITUTION",
charset=config['connection.charset'],
**self.conn_info)
self.conn_info['ssl_input'] = ssl_input
self._conn.autocommit(True)
def user_register(db: pymysql.connections.Connection, userName, userPwd, userSUPER, userEmail, userLikes):
cursor = db.cursor()
ret = False
try:
if cursor.execute('select user_id from t_users where user_name=\'{}\';'.format(userName)) != 0:
raise UserManagerError('用户注册错误,已存在相同的用户名')
sql = 'insert into t_users (user_name, user_SUPER, user_like, user_pwd, user_email) values (\'{}\', {}, \'{}\', \'{}\', \'{}\');'
sql = sql.format(userName, userSUPER, ','.join(userLikes), userPwd, userEmail)
StdError.info("注册用户" + sql)
cursor.execute(sql)
db.commit()
ret = True
except UserManagerError as e:
db.rollback()
StdError.warn(e.message + "\tuser_name=" + userName + "\tuser_SUPER=" + str(userSUPER) + "\tuser_pwd=" + str(userPwd) + "\tuser_email=" + userEmail + "\tuser_likes=" + userLikes)
except pymysql.err.IntegrityError as e:
db.rollback()
StdError.error(str(e) + "\tuser_name=" + userName + "\tuser_SUPER=" + str(userSUPER) + "\tuser_pwd=" + str(userPwd) + "\tuser_email=" + userEmail + "\tuser_likes=" + userLikes)
except:
db.rollback()
StdError.error("用户注册出现未知错误" + "\tuser_name=" + userName + "\tuser_SUPER=" + str(userSUPER) + "\tuser_pwd=" + str(userPwd) + "\tuser_email=" + userEmail + "\tuser_likes=" + userLikes)
finally:
cursor.close()
return ret
hw.raw_data_json,
getattr(hw, "creation_date", None),
getattr(hw, "device_type", ""),
getattr(hw, "formula_id", None),
getattr(hw, "secret", ""),
getattr(hw, "ip", None),
str(getattr(hw, "segmentation", "")),
getattr(hw, "internal_id", ""),
getattr(hw, "description", ""),
)
cursor.execute(sql, data)
connection.commit()
for symbol_id, strokes in zip(hw.symbol_stream, hw.segmentation):
insert_symbol_mapping(cursor.lastrowid, symbol_id, hw.user_id, strokes)
logging.info("Insert raw data.")
except pymysql.err.IntegrityError as e:
print(f"Error: {e} (can probably be ignored)")
def _get_global_stats(self, conn):
"""Fetch the global ProxySQL stats."""
sql = 'SELECT * FROM stats.stats_mysql_global'
try:
with closing(conn.cursor()) as cursor:
cursor.execute(sql)
if cursor.rowcount < 1:
self.warning("Failed to fetch records from the stats schema 'stats_mysql_global' table.")
return None
return {row['Variable_Name']: row['Variable_Value'] for row in cursor.fetchall()}
except (pymysql.err.InternalError, pymysql.err.OperationalError) as e:
self.warning("ProxySQL global stats unavailable at this time: %s" % str(e))
return None
if errno == 1617 and msg == "There is no master connection '{0}'".format(replication_channel):
# MariaDB complains when you try to get slave status with a
# connection name on the master, without connection name it
# responds an empty string as expected.
# Mysql behaves the same with or without connection name.
pass
else:
self.warning("Privileges error getting replication status (must grant REPLICATION CLIENT): %s", e)
try:
with closing(db.cursor(pymysql.cursors.DictCursor)) as cursor:
cursor.execute("SHOW MASTER STATUS;")
binlog_results = cursor.fetchone()
if binlog_results:
replica_results.update({'Binlog_enabled': True})
except (pymysql.err.InternalError, pymysql.err.OperationalError) as e:
self.warning("Privileges error getting binlog information (must grant REPLICATION CLIENT): %s", e)
return replica_results
return errors.LostConnectionError(disconnect_codes[client_error.args[0]], *client_error.args[1:])
# Access errors
if isinstance(client_error, client.err.OperationalError) and client_error.args[0] in (1044, 1142):
return errors.AccessError('Insufficient privileges.', client_error.args[1], query)
# Integrity errors
if isinstance(client_error, client.err.IntegrityError) and client_error.args[0] == 1062:
return errors.DuplicateError(*client_error.args[1:])
if isinstance(client_error, client.err.IntegrityError) and client_error.args[0] == 1452:
return errors.IntegrityError(*client_error.args[1:])
# Syntax errors
if isinstance(client_error, client.err.ProgrammingError) and client_error.args[0] == 1064:
return errors.QuerySyntaxError(client_error.args[1], query)
# Existence errors
if isinstance(client_error, client.err.ProgrammingError) and client_error.args[0] == 1146:
return errors.MissingTableError(client_error.args[1], query)
if isinstance(client_error, client.err.InternalError) and client_error.args[0] == 1364:
return errors.MissingAttributeError(*client_error.args[1:])
if isinstance(client_error, client.err.InternalError) and client_error.args[0] == 1054:
return errors.UnknownAttributeError(*client_error.args[1:])
# all the other errors are re-raised in original form
return client_error
def _enable_performance_schema_consumers(self, db):
query = """UPDATE performance_schema.setup_consumers SET enabled = 'YES' WHERE name = 'events_statements_history_long'"""
with closing(db.cursor()) as cursor:
try:
cursor.execute(query)
except pymysql.err.OperationalError as e:
if e.args[0] == 1142:
self.log.error('Unable to create performance_schema consumers: %s', e.args[1])
else:
raise
else:
self.log.info('Successfully enabled events_statements_history_long consumers')
def _get_stats_from_innodb_status(self, db):
# There are a number of important InnoDB metrics that are reported in
# InnoDB status but are not otherwise present as part of the STATUS
# variables in MySQL. Majority of these metrics are reported though
# as a part of STATUS variables in Percona Server and MariaDB.
# Requires querying user to have PROCESS privileges.
try:
with closing(db.cursor()) as cursor:
cursor.execute("SHOW /*!50000 ENGINE*/ INNODB STATUS")
except (pymysql.err.InternalError, pymysql.err.OperationalError, pymysql.err.NotSupportedError) as e:
self.warning("Privilege error or engine unavailable accessing the INNODB status \
tables (must grant PROCESS): %s" % str(e))
return {}
if cursor.rowcount < 1:
# No data from SHOW ENGINE STATUS, even though the engine is enabled.
# EG: This could be an Aurora Read Instance
self.warning("""'SHOW ENGINE INNODB STATUS' returned no data.
If you are running an Aurora Read Instace, this is expected and you should disable the innodb metrics collection""")
return {}
innodb_status = cursor.fetchone()
innodb_status_text = innodb_status[2]
results = defaultdict(int)
# Here we now parse InnoDB STATUS one line at a time
if len(vc) == 3:
for n in vc:
vn *= 1000
vn += int(n)
is_old_version = vn < 5007006
if is_old_version:
sql = f'set password for \'{user}\'@\'{user_host}\' = password(\'{pymysql.escape_string(newpassword)}\')'
else:
sql = f'alter user \'{user}\'@\'{user_host}\' identified by \'{pymysql.escape_string(newpassword)}\''
cursor.execute(sql)
record.password = newpassword
return True
except pymysql.err.OperationalError as e:
logging.error("MySQL Plugin Error: Unable to establish connection: %s", e)
except pymysql.err.ProgrammingError as e:
logging.error("MySQL Plugin Syntax Error: %s", e)
except Exception as e:
logging.error("MySQL password rotation error: %s", e)
return False