Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def update_log(cls, user):
"""
Update the current UserSessionLog for a particular user.
"""
if user and isinstance(user, FacilityUser):
try:
user_session_log = cls.objects.filter(user=user).latest(
"last_interaction_timestamp"
)
except ObjectDoesNotExist:
user_session_log = None
if not user_session_log or timezone.now() - user_session_log.last_interaction_timestamp > timedelta(
minutes=5
):
user_session_log = cls(user=user)
user_session_log.last_interaction_timestamp = local_now()
user_session_log.save()
def users_for_facilities(self, request):
if "member_of" not in request.GET:
raise MissingRequiredParamsException(
"member_of is required for this endpoint"
)
facility_ids = request.GET["member_of"].split(",")
users = (
FacilityUser.objects.filter(facility__in=facility_ids)
.annotate(is_learner=Exists(Role.objects.filter(user=OuterRef("id"))))
.values("id", "username", "facility_id", "is_learner", "password")
)
def sanitize_users(user):
password = user.pop("password")
user["needs_password"] = password == "NOT_SPECIFIED" or not password
return user
return Response(list(map(sanitize_users, users)))
def get_all_learners(self):
"""
Get all Learners that are somehow assigned to this Lesson
"""
assignments = self.lesson_assignments.all()
learners = FacilityUser.objects.none()
for a in assignments:
learners = learners.union(a.collection.get_members())
return learners
def handle(self, *args, **options):
try:
if options["facility"]:
user = FacilityUser.objects.get(
username=options["username"], facility_id=options["facility"]
)
else:
user = FacilityUser.objects.get(username=options["username"])
except FacilityUser.DoesNotExist:
raise CommandError(
"User with username `{username}` does not exist.".format(
username=options["username"]
)
)
except FacilityUser.MultipleObjectsReturned:
raise CommandError(
(
"There is more than one user on this device with the username `{username}`. "
"Please specify the facility ID for this user.".format(
username=options["username"]
# Prompt user to pick a superuser if one does not currently exist
while not DevicePermissions.objects.filter(is_superuser=True).exists():
# specify username of account that will become a superuser
if not username:
username = input(
"Please enter username of account that will become the superuser on this device: "
)
if not FacilityUser.objects.filter(username=username).exists():
self.stderr.write(
"User with username {} does not exist".format(username)
)
username = None
continue
# make the user with the given credentials, a superuser for this device
user = FacilityUser.objects.get(username=username, dataset_id=dataset_id)
# create permissions for the authorized user
DevicePermissions.objects.update_or_create(
user=user, defaults={"is_superuser": True, "can_manage_content": True}
)
# if device has not been provisioned, set it up
if not device_provisioned():
provision_device()
def handle(self, *args, **options):
try:
user = FacilityUser.objects.get(username=options["username"])
except FacilityUser.DoesNotExist:
raise CommandError(
"User with username `{username}` does not exist.".format(
username=options["username"]
)
)
# create username directory to hold associated files
cwd = os.getcwd()
directory_location = os.path.join(cwd, user.username)
if not os.path.isdir(directory_location):
os.makedirs(directory_location)
# write basic user data to file
file_name = "{user}.txt".format(user=user.username)
file_location = os.path.join(directory_location, file_name)
data = FacilityUserSerializer(user).data
def csv_file_generator(facility, filepath, overwrite=True, demographic=False):
if not overwrite and os.path.exists(filepath):
raise ValueError("{} already exists".format(filepath))
queryset = FacilityUser.objects.filter(facility=facility)
header_labels = tuple(
label
for field, label in labels.items()
if demographic or field not in DEMO_FIELDS
)
columns = tuple(
column for column in db_columns if demographic or column not in DEMO_FIELDS
)
if sys.version_info[0] < 3:
csv_file = io.open(filepath, "wb")
else:
csv_file = io.open(filepath, "w", newline="")
)
class NestedExamAssignmentSerializer(serializers.ModelSerializer):
collection = NestedCollectionSerializer(read_only=True)
class Meta:
model = ExamAssignment
fields = (
'id', 'exam', 'collection',
)
class ExamAssignmentCreationSerializer(serializers.ModelSerializer):
assigned_by = serializers.PrimaryKeyRelatedField(read_only=False, queryset=FacilityUser.objects.all())
collection = serializers.PrimaryKeyRelatedField(read_only=False, queryset=Collection.objects.all())
class Meta:
model = ExamAssignment
fields = (
'id', 'exam', 'collection', 'assigned_by',
)
read_only_fields = ('assigned_by',)
def to_internal_value(self, data):
# Make a new OrderedDict from the input, which could be an immutable QueryDict
data = OrderedDict(data)
data['assigned_by'] = self.context['request'].user.id
return super(ExamAssignmentCreationSerializer, self).to_internal_value(data)