Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _base_params(self, query, dp, compiled, is_update):
"""
handle params
"""
if dp and isinstance(dp, (list, tuple)):
if is_update:
dp = {c.key: pval for c, pval in zip(query.table.c, dp)}
else:
raise exc.ArgumentError(
"Don't mix sqlalchemy SELECT "
"clause with positional "
"parameters"
)
compiled_params = compiled.construct_params(dp)
processors = compiled._bind_processors
params = [{
key: processors.get(key, noop)(compiled_params[key])
for key in compiled_params
}]
post_processed_params = self._dialect.execute_sequence_format(params)
return post_processed_params[0]
async def _executemany(self, query, dps, cursor):
"""
executemany
"""
result_map = None
if isinstance(query, str):
await cursor.executemany(query, dps)
elif isinstance(query, DDLElement):
raise exc.ArgumentError(
"Don't mix sqlalchemy DDL clause "
"and execution with parameters"
)
elif isinstance(query, ClauseElement):
compiled = query.compile(dialect=self._dialect)
params = []
is_update = isinstance(query, UpdateBase)
for dp in dps:
params.append(
self._base_params(
query,
dp,
compiled,
is_update,
)
)
post_processed_params = self._base_params(
query,
dp,
compiled,
isinstance(query, UpdateBase)
)
result_map = compiled._result_columns
else:
if dp:
raise exc.ArgumentError("Don't mix sqlalchemy DDL clause "
"and execution with parameters")
post_processed_params = compiled.construct_params()
result_map = None
await cursor.execute(str(compiled), post_processed_params)
else:
raise exc.ArgumentError("sql statement should be str or "
"SQLAlchemy data "
"selection/modification clause")
ret = await create_result_proxy(
self, cursor, self._dialect, result_map
)
self._weak_results.add(ret)
return ret
# ported from: https://github.com/aio-libs/aiopg/blob/master/aiopg/sa/exc.py
class Error(Exception):
"""Generic error class."""
class ArgumentError(Error):
"""Raised when an invalid or conflicting function argument is supplied.
This error generally corresponds to construction time state errors.
"""
class InvalidRequestError(ArgumentError):
"""aiomysql.sa was asked to do something it can't do.
This error generally corresponds to runtime state errors.
"""
class NoSuchColumnError(KeyError, InvalidRequestError):
"""A nonexistent column is requested from a ``RowProxy``."""
class ResourceClosedError(InvalidRequestError):
"""An operation was requested from a connection, cursor, or other
object that's in a closed state."""
"""A coroutine for Engine creation.
Returns Engine instance with embedded connection pool.
The pool has *minsize* opened connections to MySQL server.
"""
deprecated_cursor_classes = [
DeserializationCursor, DictCursor, SSCursor, SSDictCursor,
]
cursorclass = kwargs.get('cursorclass', Cursor)
if not issubclass(cursorclass, Cursor) or any(
issubclass(cursorclass, cursor_class)
for cursor_class in deprecated_cursor_classes
):
raise ArgumentError('SQLAlchemy engine does not support '
'this cursor class')
coro = _create_engine(minsize=minsize, maxsize=maxsize, loop=loop,
dialect=dialect, pool_recycle=pool_recycle,
compiled_cache=compiled_cache, **kwargs)
return _EngineContextManager(coro)