Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(
self,
channel_id,
channel_version=None,
cancel_check=None,
source=None,
destination=None,
):
self.channel_id = channel_id
self.channel_version = channel_version
self.cancel_check = cancel_check
self.source_db_path = source or get_content_database_file_path(self.channel_id)
self.source = Bridge(sqlite_file_path=self.source_db_path)
# Explicitly set the destination schema version to our latest published schema version
# Not the current schema of the DB, as we do our mapping to the published versions.
if destination is None:
# If no destination is set then we are targeting the default database
self.destination = Bridge(
schema_version=CONTENT_SCHEMA_VERSION, app_name=CONTENT_APP_NAME
)
else:
# If a destination is set then pass that explicitly. At the moment, this only supports
# importing to an arbitrary SQLite file path.
self.destination = Bridge(
sqlite_file_path=destination,
schema_version=CONTENT_SCHEMA_VERSION,
def _transfer(self, method, channel_id, baseurl=None, path=None):
dest = paths.get_content_database_file_path(channel_id)
# determine where we're downloading/copying from, and create appropriate transfer object
if method == DOWNLOAD_METHOD:
url = paths.get_content_database_file_url(channel_id, baseurl=baseurl)
logger.debug("URL to fetch: {}".format(url))
filetransfer = transfer.FileDownload(url, dest)
elif method == COPY_METHOD:
srcpath = paths.get_content_database_file_path(channel_id, datafolder=path)
filetransfer = transfer.FileCopy(srcpath, dest)
logger.debug("Destination: {}".format(dest))
finished = False
while not finished:
finished = self._start_file_transfer(filetransfer, channel_id, dest)
if self.is_cancelled():
def initialize_import_manager(channel_id, cancel_check=None):
channel_metadata = read_channel_metadata_from_db_file(get_content_database_file_path(channel_id))
# For old versions of content databases, we can only infer the schema version
min_version = getattr(channel_metadata, 'min_schema_version', getattr(channel_metadata, 'inferred_schema_version'))
try:
ImportClass = mappings.get(min_version)
except KeyError:
try:
version_number = int(min_version)
if version_number > int(CONTENT_SCHEMA_VERSION):
raise FutureSchemaError('Tried to import schema version, {version}, which is not supported by this version of Kolibri.'.format(
version=min_version,
))
elif version_number < int(CONTENT_SCHEMA_VERSION):
# If it's a valid integer, but there is no schema for it, then we have stopped supporting this version
raise InvalidSchemaVersionError('Tried to import unsupported schema version {version}'.format(
version=min_version,
def _transfer(self, method, channel_id, baseurl=None, path=None, no_upgrade=False):
new_channel_dest = paths.get_upgrade_content_database_file_path(channel_id)
dest = (
new_channel_dest
if no_upgrade
else paths.get_content_database_file_path(channel_id)
)
# if new channel version db has previously been downloaded, just copy it over
if os.path.exists(new_channel_dest) and not no_upgrade:
method = COPY_METHOD
# determine where we're downloading/copying from, and create appropriate transfer object
if method == DOWNLOAD_METHOD:
url = paths.get_content_database_file_url(channel_id, baseurl=baseurl)
logger.debug("URL to fetch: {}".format(url))
filetransfer = transfer.FileDownload(url, dest)
elif method == COPY_METHOD:
# if there is a new channel version db, set that as source path
srcpath = (
new_channel_dest
if os.path.exists(new_channel_dest)
else paths.get_content_database_file_path(channel_id, datafolder=path)
def handle_async(self, *args, **options):
channel_id = options["channel_id"]
data_dir = os.path.realpath(options["destination"])
logger.info(
"Exporting channel database for channel id {} to {}".format(
channel_id, data_dir
)
)
src = paths.get_content_database_file_path(channel_id)
dest = paths.get_content_database_file_path(channel_id, datafolder=data_dir)
logger.debug("Source file: {}".format(src))
logger.debug("Destination file: {}".format(dest))
with transfer.FileCopy(src, dest, cancel_check=self.is_cancelled) as copy:
with self.start_progress(total=copy.total_size) as progress_update:
try:
for block in copy:
progress_update(len(block))
except transfer.TransferCanceled:
pass
if self.is_cancelled():
try:
def _transfer(self, method, channel_id, baseurl=None, path=None):
dest = paths.get_content_database_file_path(channel_id)
# determine where we're downloading/copying from, and create appropriate transfer object
if method == DOWNLOAD_METHOD:
url = paths.get_content_database_file_url(channel_id, baseurl=baseurl)
logger.debug("URL to fetch: {}".format(url))
filetransfer = transfer.FileDownload(url, dest)
elif method == COPY_METHOD:
srcpath = paths.get_content_database_file_path(channel_id, datafolder=path)
filetransfer = transfer.FileCopy(srcpath, dest)
logger.debug("Destination: {}".format(dest))
finished = False
while not finished:
finished = self._start_file_transfer(filetransfer, channel_id, dest)
if self.is_cancelled():
self.cancel()
break
def handle_async(self, *args, **options):
channel_id = options["channel_id"]
data_dir = os.path.realpath(options["destination"])
logger.info(
"Exporting channel database for channel id {} to {}".format(
channel_id, data_dir
)
)
src = paths.get_content_database_file_path(channel_id)
dest = paths.get_content_database_file_path(channel_id, datafolder=data_dir)
logger.debug("Source file: {}".format(src))
logger.debug("Destination file: {}".format(dest))
with transfer.FileCopy(src, dest, cancel_check=self.is_cancelled) as copy:
with self.start_progress(total=copy.total_size) as progress_update:
try:
for block in copy:
progress_update(len(block))
except transfer.TransferCanceled:
pass
if self.is_cancelled():
)
# if new channel version db has previously been downloaded, just copy it over
if os.path.exists(new_channel_dest) and not no_upgrade:
method = COPY_METHOD
# determine where we're downloading/copying from, and create appropriate transfer object
if method == DOWNLOAD_METHOD:
url = paths.get_content_database_file_url(channel_id, baseurl=baseurl)
logger.debug("URL to fetch: {}".format(url))
filetransfer = transfer.FileDownload(url, dest)
elif method == COPY_METHOD:
# if there is a new channel version db, set that as source path
srcpath = (
new_channel_dest
if os.path.exists(new_channel_dest)
else paths.get_content_database_file_path(channel_id, datafolder=path)
)
filetransfer = transfer.FileCopy(srcpath, dest)
logger.debug("Destination: {}".format(dest))
self._start_file_transfer(filetransfer, channel_id, dest, no_upgrade=no_upgrade)
if self.is_cancelled():
self.cancel()
# if we are trying to upgrade, remove new channel db
if os.path.exists(new_channel_dest) and not no_upgrade:
os.remove(new_channel_dest)