Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_connection_await(self):
db = await aiosqlite.connect(TEST_DB)
self.assertIsInstance(db, aiosqlite.Connection)
async with db.execute("select 1, 2") as cursor:
rows = await cursor.fetchall()
self.assertEqual(rows, [(1, 2)])
await db.close()
async def test_connection_context(self):
async with aiosqlite.connect(TEST_DB) as db:
self.assertIsInstance(db, aiosqlite.Connection)
async with db.execute("select 1, 2") as cursor:
rows = await cursor.fetchall()
self.assertEqual(rows, [(1, 2)])
async def acquire(self) -> aiosqlite.Connection:
connection = aiosqlite.connect(
database=self._url.database, isolation_level=None, **self._options
)
await connection.__aenter__()
return connection
def __init__(self, path="history.db", loop=None):
self.logger = logging.getLogger(__name__)
self._datachanges_period = {}
self._db_file = path
self._event_fields = {}
self._db: aiosqlite.Connection = None
self._loop = loop or get_event_loop()