Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_pool_init_run_until_complete(self):
pool_init = self.create_pool(database='postgres')
pool = self.loop.run_until_complete(pool_init)
self.assertIsInstance(pool, asyncpg.pool.Pool)
def create_pool(self, pool_class=pg_pool.Pool,
connection_class=pg_connection.Connection, **kwargs):
conn_spec = self.get_connection_spec(kwargs)
pool = create_pool(loop=self.loop, pool_class=pool_class,
connection_class=connection_class, **conn_spec)
self._pools.append(pool)
return pool
**connect_kwargs):
class SAConnection(connection_class):
def __init__(self, *args, dialect=dialect, **kwargs):
super().__init__(*args, dialect=dialect, **kwargs)
connection_class = SAConnection
# dict is fine on the pool object as there is usually only one of them
# asyncpg.pool.Pool.__slots__ += ('__dict__',)
# monkey patch pool to have some extra methods
def transaction(self, **kwargs):
return ConnectionTransactionContextManager(self, **kwargs)
asyncpg.pool.Pool.transaction = transaction
asyncpg.pool.Pool.begin = transaction
pool = asyncpg.create_pool(*args, connection_class=connection_class,
**connect_kwargs)
return pool
connection_class=_SAConnection,
**connect_kwargs):
class SAConnection(connection_class):
def __init__(self, *args, dialect=dialect, **kwargs):
super().__init__(*args, dialect=dialect, **kwargs)
connection_class = SAConnection
# dict is fine on the pool object as there is usually only one of them
# asyncpg.pool.Pool.__slots__ += ('__dict__',)
# monkey patch pool to have some extra methods
def transaction(self, **kwargs):
return ConnectionTransactionContextManager(self, **kwargs)
asyncpg.pool.Pool.transaction = transaction
asyncpg.pool.Pool.begin = transaction
pool = asyncpg.create_pool(*args, connection_class=connection_class,
**connect_kwargs)
return pool
'cogs.ahk.ahk',
'cogs.ahk.help',
'cogs.ahk.internal.logger',
'cogs.ahk.internal.security',
'cogs.dwitter',
'cogs.linus',
'cogs.owner',
)
class AceBot(commands.Bot):
support_link = 'https://discord.gg/X7abzRe'
ready: asyncio.Event
aiohttp: aiohttp.ClientSession
db: asyncpg.pool
config: ConfigTable
startup_time: datetime
def __init__(self, db, **kwargs):
super().__init__(
command_prefix=self.prefix_resolver,
owner_id=OWNER_ID,
description=DESCRIPTION,
help_command=EditedMinimalHelpCommand(),
max_messages=20000,
activity=BOT_ACTIVITY,
**kwargs
)
self.db = db
self.config = ConfigTable(self, table='config', primary='guild_id', record_class=GuildConfigRecord)
self.user = user
self.password = password
self.database = database
self.host = host
self.port = int(port) # make sure port is int type
self.extra = kwargs.copy()
self.schema = self.extra.pop("schema", None)
self.extra.pop("connection_name", None)
self.extra.pop("fetch_inserted", None)
self.extra.pop("loop", None)
self.extra.pop("connection_class", None)
self.pool_minsize = int(self.extra.pop("minsize", 1))
self.pool_maxsize = int(self.extra.pop("maxsize", 5))
self._template: dict = {}
self._pool: Optional[asyncpg.pool] = None
self._connection = None
def __init__(self):
super().__init__(command_prefix=_prefix_callable, case_insensitive=True, fetch_offline_members=True,
description="Discord bot with functions for the MMORPG Tibia.")
# Remove default help command to implement custom one
self.remove_command("help")
self.users_servers = defaultdict(list)
self.config: config.Config = None
self.pool: asyncpg.pool.Pool = None
self.start_time = dt.datetime.utcnow()
self.session = aiohttp.ClientSession(loop=self.loop)
# Dictionary of worlds tracked by nabbot, key:value = server_id:world
# Dictionary is populated from database
# A list version is created from the dictionary
self.tracked_worlds = {}
self.tracked_worlds_list = []
self.prefixes = defaultdict()
self.__version__ = "2.4.0"
import sqlite3
from typing import Any, List, Optional, Union, TypeVar, Tuple
import asyncpg
import tibiapy
WIKIDB = "data/tibiawiki.db"
# Open database in read only mode.
wiki_db = sqlite3.connect(f"file:{WIKIDB}?mode=ro", uri=True)
wiki_db.row_factory = sqlite3.Row
# Pattern to match the number of affected rows
result_patt = re.compile(r"(\d+)$")
PoolConn = Union[asyncpg.pool.Pool, asyncpg.Connection]
"""A type alias for an union of Pool and Connection."""
T = TypeVar('T')
def get_affected_count(result: str) -> int:
"""Gets the number of affected rows by a UPDATE, DELETE or INSERT queries."""
m = result_patt.search(result.strip())
if not m:
return 0
return int(m.group(1))
async def get_prefixes(pool: PoolConn, guild_id: int):
"""Gets the list of prefixes for a given server.
:param pool: An asyncpg Pool or Connection.