Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
results = Queue()
completed_queries = []
def __subclasscheck__(cls, subclass):
if subclass == MockConnection:
return True
return old_subclass_check(cls, subclass)
old_subclass_check = ConnectionMeta.__subclasscheck__
ConnectionMeta.__subclasscheck__ = __subclasscheck__
class MockConnection:
__slots__ = Connection.__slots__
@property
def completed_queries(self):
return completed_queries
@property
def results(self):
return results
@results.setter
def results(self, result):
global results
results = result
def set_database_results(self, *dbresults):
self.results = Queue()
from asyncpg.connection import Connection
from .dialects.asyncpg import GinoCursorFactory
class GinoConnection(Connection):
__slots__ = ('_metadata', '_execution_options')
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._metadata = None
self._execution_options = {}
def _set_metadata(self, metadata):
self._metadata = metadata
@property
def metadata(self):
return self._metadata
@property
def execution_options(self):
from asyncpg.pool import Pool
from asyncpg.connection import Connection
from asyncio.queues import Queue
from time import perf_counter
from sqlalchemy.dialects.postgresql import JSONB
import asyncpg
import yo.db
from yo.db import metadata
from yo.schema import TransportType
from yo.json import loads
logger = structlog.getLogger(__name__, source='YoDB')
PoolOrConn = TypeVar('PoolOrConn', Pool, Connection)
QItemId = int
NotificationId = int
POP_Q_BY_ID_STMT = '''
DELETE FROM queue
WHERE qid = (
SELECT qid
FROM queue
WHERE qid=$1
FOR UPDATE SKIP LOCKED
)
RETURNING qid, data;
'''
def construct_approzium_connection(authenticator):
# instantiate a fresh instance of the class on each connection
# this instance is used to pass information that is needed during
# connection process (e.g.: the authenticator)
class ApproziumConnection(asyncpg.connection.Connection):
pass
ApproziumConnection.authenticator = authenticator
return ApproziumConnection
.. versionadded:: 0.18.0
Added ability to specify multiple hosts in the *dsn*
and *host* arguments.
.. _SSLContext: https://docs.python.org/3/library/ssl.html#ssl.SSLContext
.. _create_default_context:
https://docs.python.org/3/library/ssl.html#ssl.create_default_context
.. _server settings:
https://www.postgresql.org/docs/current/static/runtime-config.html
.. _postgres envvars:
https://www.postgresql.org/docs/current/static/libpq-envars.html
.. _libpq connection URI format:
https://www.postgresql.org/docs/current/static/\
libpq-connect.html#LIBPQ-CONNSTRING
"""
if not issubclass(connection_class, Connection):
raise TypeError(
'connection_class is expected to be a subclass of '
'asyncpg.Connection, got {!r}'.format(connection_class))
if loop is None:
loop = asyncio.get_event_loop()
return await connect_utils._connect(
loop=loop, timeout=timeout, connection_class=connection_class,
dsn=dsn, host=host, port=port, user=user,
password=password, passfile=passfile,
ssl=ssl, database=database,
server_settings=server_settings,
command_timeout=command_timeout,
statement_cache_size=statement_cache_size,
max_cached_statement_lifetime=max_cached_statement_lifetime,