Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
:param query: ``str`` sql statement
:param args: ``tuple`` or ``list`` of arguments for sql query
:returns: ``int``, number of rows that has been produced of affected
"""
conn = self._get_db()
while (yield from self.nextset()):
pass
if args is not None:
query = query % self._escape_args(args, conn)
yield from self._query(query)
self._executed = query
if self._echo:
logger.info(query)
logger.info("%r", args)
return self._rowcount
('John', '555-003')
]
stmt = "INSERT INTO employees (name, phone) VALUES ('%s','%s')"
yield from cursor.executemany(stmt, data)
INSERT statements are optimized by batching the data, that is
using the MySQL multiple rows syntax.
:param query: `str`, sql statement
:param args: ``tuple`` or ``list`` of arguments for sql query
"""
if not args:
return
if self._echo:
logger.info("CALL %s", query)
logger.info("%r", args)
m = RE_INSERT_VALUES.match(query)
if m:
q_prefix = m.group(1)
q_values = m.group(2).rstrip()
q_postfix = m.group(3) or ''
assert q_values[0] == '(' and q_values[-1] == ')'
return (yield from self._do_execute_many(
q_prefix, q_values, q_postfix, args, self.max_stmt_length,
self._get_db().encoding))
else:
rows = 0
for arg in args:
yield from self.execute(query, arg)
rows += self._rowcount
async def caching_sha2_password_auth(self, pkt):
# No password fast path
if not self._password:
self.write_packet(b'')
pkt = await self._read_packet()
pkt.check_error()
return pkt
if pkt.is_auth_switch_request():
# Try from fast auth
logger.debug("caching sha2: Trying fast path")
self.salt = pkt.read_all()
scrambled = _auth.scramble_caching_sha2(
self._password.encode('latin1'), self.salt
)
self.write_packet(scrambled)
pkt = await self._read_packet()
pkt.check_error()
# else: fast auth is tried in initial handshake
if not pkt.is_extra_auth_data():
raise OperationalError(
"caching sha2: Unknown packet "
"for fast auth: {0}".format(pkt._data[:1])
)
pkt = await self._read_packet()
pkt.check_error()
return pkt
if pkt.is_auth_switch_request():
self.salt = pkt.read_all()
if not self.server_public_key and self._password:
# Request server public key
logger.debug("sha256: Requesting server public key")
self.write_packet(b'\1')
pkt = await self._read_packet()
pkt.check_error()
if pkt.is_extra_auth_data():
self.server_public_key = pkt._data[1:]
logger.debug(
"Received public key:\n",
self.server_public_key.decode('ascii')
)
if self._password:
if not self.server_public_key:
raise OperationalError("Couldn't receive server's public key")
data = _auth.sha2_rsa_encrypt(
self._password.encode('latin1'), self.salt,
self.server_public_key
)
else:
data = b''
self.write_packet(data)
:param query: ``str`` sql statement
:param args: ``tuple`` or ``list`` of arguments for sql query
:returns: ``int``, number of rows that has been produced of affected
"""
conn = self._get_db()
while (await self.nextset()):
pass
if args is not None:
query = query % self._escape_args(args, conn)
await self._query(query)
self._executed = query
if self._echo:
logger.info(query)
logger.info("%r", args)
return self._rowcount
Compatibility warning: The act of calling a stored procedure
itself creates an empty result set. This appears after any
result sets generated by the procedure. This is non-standard
behavior with respect to the DB-API. Be sure to use nextset()
to advance through all result sets; otherwise you may get
disconnected.
:param procname: ``str``, name of procedure to execute on server
:param args: `sequence of parameters to use with procedure
:returns: the original args.
"""
conn = self._get_db()
if self._echo:
logger.info("CALL %s", procname)
logger.info("%r", args)
for index, arg in enumerate(args):
q = "SET @_%s_%d=%s" % (procname, index, conn.escape(arg))
await self._query(q)
await self.nextset()
_args = ','.join('@_%s_%d' % (procname, i) for i in range(len(args)))
q = "CALL %s(%s)" % (procname, _args)
await self._query(q)
self._executed = q
return args