Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_progress_by_id(id):
nodes = ContentNode.objects.filter(id=id)
return 0.0 if len(nodes) == 0 else get_progress(nodes[0])
def get_resource_title(resource_id):
try:
node = ContentNode.objects.get(pk=resource_id)
return node.title
except ContentNode.DoesNotExist:
return ''
continue
else:
exception = e
break
with db_task_write_lock:
annotation.set_content_visibility(
channel_id,
file_checksums_to_annotate,
node_ids=node_ids,
exclude_node_ids=exclude_node_ids,
public=public,
)
resources_after_transfer = (
ContentNode.objects.filter(channel_id=channel_id, available=True)
.exclude(kind=content_kinds.TOPIC)
.values("content_id")
.distinct()
.count()
)
if job:
job.extra_metadata["transferred_file_size"] = transferred_file_size
job.extra_metadata["transferred_resources"] = (
resources_after_transfer - resources_before_transfer
)
job.save_meta()
if number_of_skipped_files > 0:
logger.warning(
"{} files are skipped, because errors occurred during the import.".format(
for model in self.content_models
if model is not ContentNode and model not in merge_models
] + [ContentNode]
for model in models_to_delete:
# we do a few things differently if it's the ContentNode model, vs a model related to ContentNode
if model is ContentNode:
template = delete_contentnode_template
fields = ["id"]
else:
template = delete_related_template
fields = [
f.column
for f in model._meta.fields
if isinstance(f, ForeignKey) and f.target_field.model is ContentNode
]
# if the external database is attached and there are no incompatible schema mappings for a table,
# we can skip deleting records that will be REPLACED during import, which helps efficiency
if self._can_use_optimized_pre_deletion(model):
template += " AND NOT id IN (SELECT id FROM sourcedb.{table})"
# run a query for each field this model has that foreignkeys onto ContentNode
for field in fields:
# construct the actual query by filling in variables
query = template.format(
table=model._meta.db_table,
fk_field=field,
tree_id=old_tree_id,
cn_table=ContentNode._meta.db_table,
def get_descendant_content_ids(self):
"""
Retrieve a queryset of content_ids for non-topic content nodes that are
descendants of this node.
"""
return (
ContentNode.objects.filter(lft__gte=self.lft, lft__lte=self.rght)
.exclude(kind=content_kinds.TOPIC)
.values_list("content_id", flat=True)
)
def count_removed_resources(destination, channel_id):
"""
Queries the destination db to get the leaf node ids.
Subtract available leaf nodes count on default db by available leaf nodes based on destination db leaf node ids.
"""
bridge = Bridge(app_name=CONTENT_APP_NAME, sqlite_file_path=destination)
ContentNodeClass = bridge.get_class(ContentNode)
leaf_node_ids = [
i
for i, in bridge.session.query(ContentNodeClass.id)
.filter(
ContentNodeClass.channel_id == channel_id,
ContentNodeClass.kind != content_kinds.TOPIC,
)
.all()
]
return (
ContentNode.objects.filter(channel_id=channel_id, available=True)
.exclude(kind=content_kinds.TOPIC)
.count()
- ContentNode.objects.filter_by_uuids(leaf_node_ids, validate=False)
.filter(available=True, channel_id=channel_id)
.count()
def calculate_total_resource_count(channel):
content_nodes = ContentNode.objects.filter(channel_id=channel.id)
channel.total_resource_count = (
content_nodes.filter(available=True)
.exclude(kind=content_kinds.TOPIC)
.dedupe_by_content_id()
.count()
)
channel.save()
]
# if the external database is attached and there are no incompatible schema mappings for a table,
# we can skip deleting records that will be REPLACED during import, which helps efficiency
if self._can_use_optimized_pre_deletion(model):
template += " AND NOT id IN (SELECT id FROM sourcedb.{table})"
# run a query for each field this model has that foreignkeys onto ContentNode
for field in fields:
# construct the actual query by filling in variables
query = template.format(
table=model._meta.db_table,
fk_field=field,
tree_id=old_tree_id,
cn_table=ContentNode._meta.db_table,
)
# check that the import operation hasn't since been cancelled
self.check_cancelled()
# execute the actual query
self.destination.session.execute(text(query))
def get_all_destination_tree_ids(self):
ContentNodeRecord = self.destination.get_class(ContentNode)
return sorted(
map(
lambda x: x[0],
self.destination.session.query(ContentNodeRecord.tree_id)
.distinct()