Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_async_methods(sqlite3_db_path, queries):
async with aiosqlite.connect(sqlite3_db_path) as conn:
conn.row_factory = dict_factory
users, sorted_users = await asyncio.gather(
queries.users.get_all(conn), queries.users.get_all_sorted(conn)
)
assert users == [
{"userid": 1, "username": "bobsmith", "firstname": "Bob", "lastname": "Smith"},
{"userid": 2, "username": "johndoe", "firstname": "John", "lastname": "Doe"},
{"userid": 3, "username": "janedoe", "firstname": "Jane", "lastname": "Doe"},
]
assert sorted_users == [
{"userid": 1, "username": "bobsmith", "firstname": "Bob", "lastname": "Smith"},
{"userid": 3, "username": "janedoe", "firstname": "Jane", "lastname": "Doe"},
{"userid": 2, "username": "johndoe", "firstname": "John", "lastname": "Doe"},
]
async def execute(query: str, *args: str):
"""
Executes the given query + args in the database
"""
async with aiosqlite.connect(DB_FILE) as db:
await db.execute(query, args)
await db.commit()
async def init_db(app: web.Application) -> AsyncIterator[None]:
sqlite_db = get_db_path()
db = await aiosqlite.connect(sqlite_db)
db.row_factory = aiosqlite.Row
app["DB"] = db
yield
await db.close()
async def create_connection(self, with_db: bool) -> None:
if not self._connection: # pragma: no branch
self._connection = aiosqlite.connect(self.filename, isolation_level=None)
self._connection.start()
await self._connection._connect()
self._connection._conn.row_factory = sqlite3.Row
for pragma, val in self.pragmas.items():
cursor = await self._connection.execute("PRAGMA {}={}".format(pragma, val))
await cursor.close()
self.log.debug(
"Created connection %s with params: filename=%s %s",
self._connection,
self.filename,
" ".join(["{}={}".format(k, v) for k, v in self.pragmas.items()]),
)
async def setup(self):
try:
self._db = await aiosqlite.connect(self._path)
self._db.row_factory = aiosqlite.Row
except:
return False
def adapt_datetime(ts):
return time.mktime(ts.timetuple())
sqlite3.register_adapter(datetime, adapt_datetime)
try:
await self.db.executescript(schemaScript)
except Exception:
log.debug('Error while executing schema script')
return False
async def init_db(app: web.Application) -> AsyncIterator[None]:
sqlite_db = get_db_path()
db = await aiosqlite.connect(sqlite_db)
db.row_factory = aiosqlite.Row
app["DB"] = db
yield
await db.close()
async def _new_conn(self):
db = await aiosqlite.connect(*self._conn_args, **self._conn_kwargs)
try:
async with db.cursor() as cur:
for q in self._init_queries:
await cur.execute(q)
except:
await db.close()
raise
return db
async def __aenter__(self) -> "BaseSourceContext":
self.__db = aiosqlite.connect(self.config.filename)
self.db = await self.__db.__aenter__()
self.db.row_factory = aiosqlite.Row
# Create table for feature data
await self.db.execute(
"CREATE TABLE IF NOT EXISTS features ("
"src_url TEXT PRIMARY KEY NOT NULL, "
+ (" REAL, ".join(self.FEATURE_COLS))
+ " REAL"
")"
)
# Create table for predictions
await self.db.execute(
"CREATE TABLE IF NOT EXISTS prediction ("
"src_url TEXT PRIMARY KEY, " + "value TEXT, "
"confidence REAL"
")"
def __init__(self, session):
loop = asyncio.get_event_loop()
self.dbw = loop.run_until_complete(sql.connect(DATABASE_FILENAME))
self.dbw.row_factory = sql.Row
self.db = loop.run_until_complete(self.dbw.cursor())
with open(os.path.join(os.path.dirname(__file__), 'sql',
'startup.sql')) as startup:
loop.run_until_complete(self.db.executescript(startup.read()))
self.session = session
async def open(self):
await self.close()
path = Path(self.db_path)
LOGGER.info("Ledger cache will be stored in %s", path)
newDB = not path.exists()
self.db = await aiosqlite.connect(str(path)).__aenter__()
if newDB:
await self.init_db()