Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_db_is_locked_locked():
import osxphotos
assert osxphotos.utils._db_is_locked(DB_LOCKED_10_12)
assert osxphotos.utils._db_is_locked(DB_LOCKED_10_15)
def test_db_is_locked_locked():
import osxphotos
assert osxphotos.utils._db_is_locked(DB_LOCKED_10_12)
assert osxphotos.utils._db_is_locked(DB_LOCKED_10_15)
def test_db_is_locked_unlocked():
import osxphotos
assert not osxphotos.utils._db_is_locked(DB_UNLOCKED_10_15)
# In those cases, make a temp copy of the file for sqlite3 to read
if _db_is_locked(self._dbfile):
self._tmp_db = self._copy_db_file(self._dbfile)
self._db_version = self._get_db_version(self._tmp_db)
# If Photos >= 5, actual data isn't in photos.db but in Photos.sqlite
if int(self._db_version) > int(_PHOTOS_4_VERSION):
dbpath = pathlib.Path(self._dbfile).parent
dbfile = dbpath / "Photos.sqlite"
if not _check_file_exists(dbfile):
raise FileNotFoundError(f"dbfile {dbfile} does not exist", dbfile)
else:
self._dbfile_actual = self._tmp_db = dbfile
# if database is exclusively locked, make a copy of it and use the copy
if _db_is_locked(self._dbfile_actual):
self._tmp_db = self._copy_db_file(self._dbfile_actual)
if _debug():
logging.debug(
f"_dbfile = {self._dbfile}, _dbfile_actual = {self._dbfile_actual}"
)
library_path = os.path.dirname(os.path.abspath(dbfile))
(library_path, _) = os.path.split(library_path) # drop /database from path
self._library_path = library_path
if int(self._db_version) <= int(_PHOTOS_4_VERSION):
masters_path = os.path.join(library_path, "Masters")
self._masters_path = masters_path
else:
masters_path = os.path.join(library_path, "originals")
self._masters_path = masters_path
# this serves as a reverse index from label to photos containing the label
# _db_searchinfo_labels_normalized is the same but with normalized (lower case) version of the label
self._db_searchinfo_labels = _db_searchinfo_labels = {}
self._db_searchinfo_labels_normalized = _db_searchinfo_labels_normalized = {}
if self._db_version <= _PHOTOS_4_VERSION:
raise NotImplementedError(
f"search info not implemented for this database version"
)
search_db_path = pathlib.Path(self._dbfile).parent / "search" / "psi.sqlite"
if not search_db_path.exists():
logging.warning(f"could not find search db: {search_db_path}")
return None
if _db_is_locked(search_db_path):
search_db = self._copy_db_file(search_db_path)
else:
search_db = search_db_path
(conn, c) = _open_sql_file(search_db)
result = c.execute(
"""
select
ga.rowid,
assets.uuid_0,
assets.uuid_1,
groups.rowid as groupid,
groups.category,
groups.owning_groupid,
groups.content_string,
# init database names
# _tmp_db is the file that will processed by _process_database4/5
# assume _tmp_db will be _dbfile or _dbfile_actual based on Photos version
# unless DB is locked, in which case _tmp_db will point to a temporary copy
# if Photos <=4, _dbfile = _dbfile_actual = photos.db
# if Photos >= 5, _dbfile = photos.db, from which we get DB version but the actual
# photos data is in Photos.sqlite
# In either case, a temporary copy will be made if the DB is locked by Photos
# or photosanalysisd
self._dbfile = self._dbfile_actual = self._tmp_db = os.path.abspath(dbfile)
# if database is exclusively locked, make a copy of it and use the copy
# Photos maintains an exclusive lock on the database file while Photos is open
# photoanalysisd sometimes maintains this lock even after Photos is closed
# In those cases, make a temp copy of the file for sqlite3 to read
if _db_is_locked(self._dbfile):
self._tmp_db = self._copy_db_file(self._dbfile)
self._db_version = self._get_db_version(self._tmp_db)
# If Photos >= 5, actual data isn't in photos.db but in Photos.sqlite
if int(self._db_version) > int(_PHOTOS_4_VERSION):
dbpath = pathlib.Path(self._dbfile).parent
dbfile = dbpath / "Photos.sqlite"
if not _check_file_exists(dbfile):
raise FileNotFoundError(f"dbfile {dbfile} does not exist", dbfile)
else:
self._dbfile_actual = self._tmp_db = dbfile
# if database is exclusively locked, make a copy of it and use the copy
if _db_is_locked(self._dbfile_actual):
self._tmp_db = self._copy_db_file(self._dbfile_actual)