Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def update_on_device_resources():
"""
Function to set on_device_resource on all topic trees to account for
those that were imported before annotations were performed
"""
bridge = Bridge(app_name=KolibriContentConfig.label)
ContentNodeClass = bridge.get_class(ContentNode)
ContentNodeTable = bridge.get_table(ContentNode)
connection = bridge.get_connection()
child = ContentNodeTable.alias()
logger.info("Updating on_device_resource on existing channels")
# start a transaction
trans = connection.begin()
# Update all leaf ContentNodes to have on_device_resource to 1 or 0
def set_leaf_nodes_invisible(channel_id, node_ids=None, exclude_node_ids=None):
"""
Set nodes in a channel as unavailable.
With no additional arguments, this will hide an entire channel.
With the additional nodes arguments, it will selectively flag nodes
as unavailable, based on the passed in ids, setting them as unavailable if
they are in node_ids, or descendants of those nodes, but not in
exclude_node_ids or descendants of those nodes.
"""
bridge = Bridge(app_name=CONTENT_APP_NAME)
connection = bridge.get_connection()
# Start a counter for the while loop
min_boundary = 1
# Calculate batch parameters
max_rght, dynamic_chunksize = _calculate_batch_params(
bridge, channel_id, node_ids, exclude_node_ids
)
logger.info(
"Removing availability of non-topic ContentNode objects in {} batches of {}".format(
int(ceil(max_rght / dynamic_chunksize)), dynamic_chunksize
)
)
def count_removed_resources(destination, channel_id):
"""
Queries the destination db to get the leaf node ids.
Subtract available leaf nodes count on default db by available leaf nodes based on destination db leaf node ids.
"""
bridge = Bridge(app_name=CONTENT_APP_NAME, sqlite_file_path=destination)
ContentNodeClass = bridge.get_class(ContentNode)
leaf_node_ids = [
i
for i, in bridge.session.query(ContentNodeClass.id)
.filter(
ContentNodeClass.channel_id == channel_id,
ContentNodeClass.kind != content_kinds.TOPIC,
)
.all()
]
return (
ContentNode.objects.filter(channel_id=channel_id, available=True)
.exclude(kind=content_kinds.TOPIC)
.count()
- ContentNode.objects.filter_by_uuids(leaf_node_ids, validate=False)
.filter(available=True, channel_id=channel_id)
def set_local_file_availability_from_disk(checksums=None, destination=None):
bridge = Bridge(app_name=CONTENT_APP_NAME, sqlite_file_path=destination)
LocalFileClass = bridge.get_class(LocalFile)
if checksums is None:
logger.info(
"Setting availability of LocalFile objects based on disk availability"
)
files = bridge.session.query(
LocalFileClass.id, LocalFileClass.available, LocalFileClass.extension
).all()
elif type(checksums) == list:
logger.info(
"Setting availability of {number} LocalFile objects based on disk availability".format(
number=len(checksums)
)
)
def recurse_annotation_up_tree(channel_id):
bridge = Bridge(app_name=CONTENT_APP_NAME)
ContentNodeClass = bridge.get_class(ContentNode)
ContentNodeTable = bridge.get_table(ContentNode)
connection = bridge.get_connection()
node_depth = (
bridge.session.query(func.max(ContentNodeClass.level))
.filter_by(channel_id=channel_id)
.scalar()
)
logger.info(
"Annotating ContentNode objects with children for {levels} levels".format(
levels=node_depth
def get_channel_annotation_stats(channel_id, checksums=None):
bridge = Bridge(app_name=CONTENT_APP_NAME)
ContentNodeTable = bridge.get_table(ContentNode)
FileTable = bridge.get_table(File)
LocalFileTable = bridge.get_table(LocalFile)
if checksums is not None:
file_table = FileTable.join(
LocalFileTable,
and_(
FileTable.c.local_file_id == LocalFileTable.c.id,
or_(
filter_by_checksums(LocalFileTable.c.id, checksums),
LocalFileTable.c.available == True, # noqa
),
),
)
else:
source_path = paths.get_upgrade_content_database_file_path(channel_id)
# annotated db to be used for calculating diff stats
destination_path = paths.get_annotated_content_database_file_path(channel_id)
try:
if method == "network":
call_command(
"importchannel", "network", channel_id, baseurl=baseurl, no_upgrade=True
)
elif method == "disk":
drive = get_mounted_drive_by_id(drive_id)
call_command(
"importchannel", "disk", channel_id, drive.datafolder, no_upgrade=True
)
# create all fields/tables at the annotated destination db, based on the current schema version
bridge = Bridge(
sqlite_file_path=destination_path, schema_version=CURRENT_SCHEMA_VERSION
)
bridge.Base.metadata.create_all(bridge.engine)
# initialize import manager based on annotated destination path, pulling from source db path
import_manager = channel_import.initialize_import_manager(
channel_id,
cancel_check=False,
source=source_path,
destination=destination_path,
)
# import channel data from source db path
import_manager.import_channel_data()
import_manager.end()
def read_channel_metadata_from_db_file(channeldbpath):
# import here to avoid circular imports whenever kolibri.core.content.models imports utils too
from kolibri.core.content.models import ChannelMetadata
source = Bridge(sqlite_file_path=channeldbpath)
ChannelMetadataClass = source.get_class(ChannelMetadata)
source_channel_metadata = source.session.query(ChannelMetadataClass).all()[0]
# Use the inferred version from the SQLAlchemy Bridge object, and set it as additional
# metadata on the channel data
source_channel_metadata.inferred_schema_version = source.schema_version
source.end()
# Adds an attribute `root_id` when `root_id` does not exist to match with
# the latest schema.
if not hasattr(source_channel_metadata, "root_id"):
setattr(