Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
self.assertFalse(db.in_transaction)
self.assertEqual(db.total_changes, 1)
self.assertIsNone(db.row_factory)
self.assertEqual(db.text_factory, str)
async with db.cursor() as cursor:
await cursor.execute("select * from test_properties")
row = await cursor.fetchone()
self.assertIsInstance(row, tuple)
self.assertEqual(row, (1, 1, "hi"))
with self.assertRaises(TypeError):
_ = row["k"]
db.row_factory = aiosqlite.Row
db.text_factory = bytes
self.assertEqual(db.row_factory, aiosqlite.Row)
self.assertEqual(db.text_factory, bytes)
async with db.cursor() as cursor:
await cursor.execute("select * from test_properties")
row = await cursor.fetchone()
self.assertIsInstance(row, aiosqlite.Row)
self.assertEqual(row[1], 1)
self.assertEqual(row[2], b"hi")
self.assertEqual(row["k"], 1)
self.assertEqual(row["d"], b"hi")
async def init_db(app: web.Application) -> AsyncIterator[None]:
sqlite_db = app["DB_PATH"]
db = await aiosqlite.connect(sqlite_db)
db.row_factory = aiosqlite.Row
app["DB"] = db
yield
await db.close()
await cursor.execute("select * from test_properties")
row = await cursor.fetchone()
self.assertIsInstance(row, tuple)
self.assertEqual(row, (1, 1, "hi"))
with self.assertRaises(TypeError):
_ = row["k"]
db.row_factory = aiosqlite.Row
db.text_factory = bytes
self.assertEqual(db.row_factory, aiosqlite.Row)
self.assertEqual(db.text_factory, bytes)
async with db.cursor() as cursor:
await cursor.execute("select * from test_properties")
row = await cursor.fetchone()
self.assertIsInstance(row, aiosqlite.Row)
self.assertEqual(row[1], 1)
self.assertEqual(row[2], b"hi")
self.assertEqual(row["k"], 1)
self.assertEqual(row["d"], b"hi")
async def init_db(app: web.Application) -> AsyncIterator[None]:
sqlite_db = get_db_path()
db = await aiosqlite.connect(sqlite_db)
db.row_factory = aiosqlite.Row
app["DB"] = db
yield
await db.close()
def __init__(self, session):
loop = asyncio.get_event_loop()
self.dbw = loop.run_until_complete(sql.connect(DATABASE_FILENAME))
self.dbw.row_factory = sql.Row
self.db = loop.run_until_complete(self.dbw.cursor())
with open(os.path.join(os.path.dirname(__file__), 'sql',
'startup.sql')) as startup:
loop.run_until_complete(self.db.executescript(startup.read()))
self.session = session
async def init_db(app: web.Application) -> AsyncIterator[None]:
sqlite_db = get_db_path()
db = await aiosqlite.connect(sqlite_db)
db.row_factory = aiosqlite.Row
app["DB"] = db
yield
await db.close()
async def init_db(app: web.Application) -> AsyncIterator[None]:
sqlite_db = get_db_path()
db = await aiosqlite.connect(sqlite_db)
db.row_factory = aiosqlite.Row
app["DB"] = db
yield
await db.close()
async def init_db(app: web.Application) -> AsyncIterator[None]:
sqlite_db = get_db_path()
db = await aiosqlite.connect(sqlite_db)
db.row_factory = aiosqlite.Row
app["DB"] = db
yield
await db.close()
async def setup(self):
try:
self._db = await aiosqlite.connect(self._path)
self._db.row_factory = aiosqlite.Row
except:
return False
def adapt_datetime(ts):
return time.mktime(ts.timetuple())
sqlite3.register_adapter(datetime, adapt_datetime)
try:
await self.db.executescript(schemaScript)
except Exception:
log.debug('Error while executing schema script')
return False
async def __aenter__(self) -> "BaseSourceContext":
self.__db = aiosqlite.connect(self.config.filename)
self.db = await self.__db.__aenter__()
self.db.row_factory = aiosqlite.Row
# Create table for feature data
await self.db.execute(
"CREATE TABLE IF NOT EXISTS features ("
"src_url TEXT PRIMARY KEY NOT NULL, "
+ (" REAL, ".join(self.FEATURE_COLS))
+ " REAL"
")"
)
# Create table for predictions
await self.db.execute(
"CREATE TABLE IF NOT EXISTS prediction ("
"src_url TEXT PRIMARY KEY, " + "value TEXT, "
"confidence REAL"
")"
)
return self