Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def setUp(self):
self.old_file = norm_file(
os.path.join(os.path.dirname(__file__),
'db', 'migrate.sqlite'))
self.new_file = norm_file(
os.path.join(os.path.dirname(__file__),
'db', 'migrate_new.sqlite'))
self.table_name = 'unnamed'
MAKE_TABLE = 'CREATE TABLE IF NOT EXISTS %s (key TEXT PRIMARY KEY, value BLOB)' % self.table_name
self.conn_old = sqlitedict.SqliteMultithread(self.old_file, autocommit=True, journal_mode="DELETE")
self.conn_old.execute(MAKE_TABLE)
self.conn_old.commit()
self.conn_new = sqlitedict.SqliteMultithread(self.new_file, autocommit=True, journal_mode="DELETE")
self.conn_new.execute(MAKE_TABLE)
self.conn_new.commit()
def setUp(self):
self.old_file = norm_file(
os.path.join(os.path.dirname(__file__),
'db', 'migrate.sqlite'))
self.new_file = norm_file(
os.path.join(os.path.dirname(__file__),
'db', 'migrate_new.sqlite'))
self.table_name = 'unnamed'
MAKE_TABLE = 'CREATE TABLE IF NOT EXISTS %s (key TEXT PRIMARY KEY, value BLOB)' % self.table_name
self.conn_old = sqlitedict.SqliteMultithread(self.old_file, autocommit=True, journal_mode="DELETE")
self.conn_old.execute(MAKE_TABLE)
self.conn_old.commit()
self.conn_new = sqlitedict.SqliteMultithread(self.new_file, autocommit=True, journal_mode="DELETE")
self.conn_new.execute(MAKE_TABLE)
self.conn_new.commit()
program = os.path.basename(sys.argv[0])
if len(sys.argv) < 3:
print(globals()['__doc__'] % locals())
sys.exit(1)
path_old = sys.argv[1]
path_new = sys.argv[2]
if len(sys.argv) == 4:
table_name = sys.argv[3]
else:
table_name = 'unnamed'
MAKE_TABLE = 'CREATE TABLE IF NOT EXISTS %s (key TEXT PRIMARY KEY, value BLOB)' % table_name
conn_old = SqliteMultithread(path_old, autocommit=True, journal_mode="DELETE")
conn_old.execute(MAKE_TABLE)
conn_old.commit()
conn_new = SqliteMultithread(path_new, autocommit=True, journal_mode="DELETE")
conn_new.execute(MAKE_TABLE)
conn_new.commit()
migrate(table_name, conn_old, conn_new)
conn_old.close()
conn_new.close()
logger.info("finished running %s", program)
if flag == 'n':
if os.path.exists(filename):
os.remove(filename)
dirname = os.path.dirname(filename)
if dirname:
if not os.path.exists(dirname):
raise RuntimeError('Error! The directory does not exist, %s' % dirname)
self.filename = filename
self.tablename = tablename
logger.info("opening Sqlite table %r in %s" % (tablename, filename))
MAKE_TABLE = 'CREATE TABLE IF NOT EXISTS %s (key TEXT PRIMARY KEY, value BLOB)' % self.tablename
self.conn = SqliteMultithread(filename, autocommit=autocommit, journal_mode=journal_mode)
self.conn.execute(MAKE_TABLE)
self.conn.commit()
if flag == 'w':
self.clear()
sys.exit(1)
path_old = sys.argv[1]
path_new = sys.argv[2]
if len(sys.argv) == 4:
table_name = sys.argv[3]
else:
table_name = 'unnamed'
MAKE_TABLE = 'CREATE TABLE IF NOT EXISTS %s (key TEXT PRIMARY KEY, value BLOB)' % table_name
conn_old = SqliteMultithread(path_old, autocommit=True, journal_mode="DELETE")
conn_old.execute(MAKE_TABLE)
conn_old.commit()
conn_new = SqliteMultithread(path_new, autocommit=True, journal_mode="DELETE")
conn_new.execute(MAKE_TABLE)
conn_new.commit()
migrate(table_name, conn_old, conn_new)
conn_old.close()
conn_new.close()
logger.info("finished running %s", program)
def __init__(self, filename, autocommit, journal_mode):
super(SqliteMultithread, self).__init__()
self.filename = filename
self.autocommit = autocommit
self.journal_mode = journal_mode
# use request queue of unlimited size
self.reqs = Queue()
self.setDaemon(True) # python2.5-compatible
self.exception = None
self.log = logging.getLogger('sqlitedict.SqliteMultithread')
self.start()
def __init__(self, filename, autocommit, journal_mode):
super(SqliteMultithread, self).__init__()
self.filename = filename
self.autocommit = autocommit
self.journal_mode = journal_mode
# use request queue of unlimited size
self.reqs = Queue()
self.setDaemon(True) # python2.5-compatible
self.exception = None
self.log = logging.getLogger('sqlitedict.SqliteMultithread')
self.start()
def _new_conn(self):
return SqliteMultithread(self.filename, autocommit=self.autocommit, journal_mode=self.journal_mode)