Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_summary_logs(content_ids, user):
from kolibri.core.logger.models import ContentSummaryLog
if not content_ids:
return ContentSummaryLog.objects.none()
# get all summary logs for the current user that correspond to the descendant content nodes
return ContentSummaryLog.objects.filter(user=user, content_id__in=content_ids)
def get_summary_logs(content_ids, user):
from kolibri.core.logger.models import ContentSummaryLog
if not content_ids:
return ContentSummaryLog.objects.none()
# get all summary logs for the current user that correspond to the descendant content nodes
return ContentSummaryLog.objects.filter(user=user, content_id__in=content_ids)
def _resource_progress(self, resource, learners):
response = {
'contentnode_id': resource['contentnode_id'],
'num_learners_completed': 0,
}
completed_content_logs = ContentSummaryLog.objects \
.filter(
content_id=resource['content_id'],
user__in=learners,
progress=1.0,
) \
.values('content_id') \
.annotate(total=Count('pk'))
# If no logs for the Content Item,
if completed_content_logs.count() is 0:
return response
else:
response['num_learners_completed'] = completed_content_logs[0]['total']
return response
last_times = [d["last"] for d in [usersess_agg, contsess_agg] if d["last"]]
# since newly provisioned devices won't have logs, we don't know whether we have an available datetime object
first_interaction_timestamp = (
getattr(min(first_times), "strftime", None) if first_times else None
)
last_interaction_timestamp = (
getattr(max(last_times), "strftime", None) if last_times else None
)
sesslogs_by_kind = (
contsessions.order_by("kind").values("kind").annotate(count=Count("kind"))
)
sesslogs_by_kind = {log["kind"]: log["count"] for log in sesslogs_by_kind}
summarylogs = ContentSummaryLog.objects.filter(dataset_id=dataset_id)
contsessions_user = contsessions.exclude(user=None)
contsessions_anon = contsessions.filter(user=None)
contsessions_anon_no_visitor_id = contsessions_anon.filter(visitor_id=None)
contsessions_anon_with_visitor_id = contsessions_anon.exclude(visitor_id=None)
users_with_logs = contsessions_user.values("user_id").distinct().count()
anon_visitors_with_logs = (
contsessions_anon_with_visitor_id.values("visitor_id").distinct().count()
)
# calculate learner stats
learner_demographics = calculate_demographic_stats(
dataset_id=dataset_id, learners=True
)
def get_progress(self, instance):
content_ids = [resource["content_id"] for resource in instance.resources]
resource_progress = (
ContentSummaryLog.objects.filter(
user=self.context["user"], content_id__in=content_ids
)
.aggregate(Sum("progress"))
.get("progress__sum")
)
return {
"resource_progress": resource_progress,
"total_resources": len(instance.resources),
}
collection__in=(c["id"] for c in items),
)
.distinct()
.values(
"description", "id", "is_active", "title", "resources", "collection"
)
)
lesson_content_ids = set()
for lesson in lessons:
lesson_content_ids |= set(
(resource["content_id"] for resource in lesson["resources"])
)
progress_map = {
l["content_id"]: l["progress"]
for l in ContentSummaryLog.objects.filter(
content_id__in=lesson_content_ids, user=self.request.user
).values("content_id", "progress")
}
for lesson in lessons:
lesson["progress"] = {
"resource_progress": sum(
(
progress_map[resource["content_id"]]
for resource in lesson["resources"]
if resource["content_id"] in progress_map
)
),
"total_resources": len(lesson["resources"]),
}
def _get_log_models(self, dataset_id):
dataset_id_filter = Q(dataset_id=dataset_id)
return GroupDeletion(
"Log models",
querysets=[
ContentSessionLog.objects.filter(dataset_id_filter),
ContentSummaryLog.objects.filter(dataset_id_filter),
AttemptLog.objects.filter(dataset_id_filter),
ExamAttemptLog.objects.filter(dataset_id_filter),
ExamLog.objects.filter(dataset_id_filter),
MasteryLog.objects.filter(dataset_id_filter),
UserSessionLog.objects.filter(dataset_id_filter),
],