Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self):
try:
self.con = pymysql.connect(self.host,
self.user,
self.pwd,
self.db,
use_unicode=True, charset="utf8")
self.con.autocommit(True)
self.cursor = self.con.cursor()
except pymysql.Error as e :
print('错误')
conn = MySQLdb.connect ( host = Settings.get( 'MySql', 'ip') ,
user = Settings.get( 'MySql', 'user'),
passwd = Settings.get( 'MySql', 'pwd'),
db = db,
unix_socket=Settings.get( 'MySql', 'sock') )
cursor = conn.cursor ()
cursor.execute ( query )
rows = cursor.fetchall()
cursor.close ()
conn.commit ()
conn.close ()
ret = rows
except MySQLdb.Error, e:
print("[querySQL] %s" % str(e))
sys.exit(1)
return ret
if 'sql' and 'dbId' in kw.keys():
try:
db = DataBase_MySQL_Server_Config.objects.get(id=kw.get('dbId'))
except Exception as ex:
{"status":"failed","msg":str(ex)}
try:
conn = pymysql.connect(host=db.db_assets.server_assets.ip,user=db.db_user,
passwd=db.db_passwd,db=db.db_name,
port=int(db.db_port))
cur = conn.cursor()
ret = cur.execute(kw.get('sql'))
conn.commit()
cur.close()
conn.close()
return {"status":'success','msg':ret}
except pymysql.Error as e:
conn.rollback()
cur.close()
conn.close()
return {"status":'failed',"msg":"Mysql Error %d: %s" % (e.args[0], e.args[1])}
else:
return {"status":"failed","msg":"参数不对"}
def __connect_remote(self):
try:
return pymysql.connect(
host=self.dbServer["ip"],
user=self.dbServer["db_user"],
password=self.dbServer["db_passwd"],
port=self.dbServer["db_port"],
db=self.dbServer["db_name"],
max_allowed_packet=1024 * 1024 * 1024,
charset='utf8')
except pymysql.Error as ex:
logger.error("连接数据库失败: {ex}".format(ex=ex.__str__()))
raise pymysql.Error(ex.__str__())
sql='''SELECT tablename from $_$inception_backup_information$_$ where opid_time = '{sequence}';'''.format(sequence = sequence)
try:
conn = pymysql.connect(
host=self.incept.backup_host,user=self.incept.backup_user,
passwd=self.incept.backup_passwd,db=dbName,
port=int(self.incept.backup_port),
use_unicode=True,
charset='utf8'
)
cur = conn.cursor()
ret = cur.execute(sql)
result = cur.fetchone()
cur.close()
conn.close()
return {"status":'success','data':result}
except pymysql.Error as e:
logger.error(msg="Mysql Error %d: %s" % (e.args[0], e.args[1]))
return {"status":'error',"errinfo":"Mysql Error %d: %s" % (e.args[0], e.args[1])}
"""
remote_username = self.get_data_from_meta_data_service(app_id, WSO2DASStartupHandler.CONST_MYSQL_ROOT_USERNAME)
remote_password = self.get_data_from_meta_data_service(app_id, WSO2DASStartupHandler.CONST_MYSQL_ROOT_PASSWORD)
WSO2DASStartupHandler.log.info("mysql server conf [host]:%s [username]:%s [password]:%s", remote_host,
remote_username, remote_password)
con = None
try:
con = db.connect(host=remote_host, user=remote_username, passwd=remote_password)
cur = con.cursor()
cur.execute('CREATE DATABASE IF NOT EXISTS ' + databasename + ' character set latin1;')
cur.execute('USE ' + databasename + ';')
cur.execute(
"GRANT ALL ON " + databasename + ".* TO " + username + "@'%' IDENTIFIED BY '" + password + "';")
WSO2DASStartupHandler.log.info("Database %s created successfully" % databasename)
except db.Error, e:
WSO2DASStartupHandler.log.error("Error in creating database %d: %s" % (e.args[0], e.args[1]))
finally:
if con:
con.close()
def fetch_notification_method_types(self):
try:
if self._mysql is None:
self._connect_to_mysql()
cur = self._mysql.cursor()
cur.execute(self._find_all_notification_types_sql)
for row in cur:
yield (row[0])
except pymysql.Error as e:
self._mysql = None
log.exception("Couldn't fetch notification types %s", e)
raise exc.DatabaseException(e)
try:
# 连接到inception
conn = pymysql.connect(host=f"{self.inception_host}", user='root', password='', db='',
port=self.inception_port, use_unicode=True, charset="utf8")
cur = conn.cursor()
cur.execute(sql)
result = cur.fetchall()
if result:
field_names = [i[0] for i in cur.description]
incep_data = []
for row in result:
incep_data.append(dict(map(lambda x, y: [x, y], field_names, row)))
cur.close()
conn.close()
return incep_data
except pymysql.Error as err:
logger.error(err.args[1])