Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
Firestore batch
"""
field_list = {}
# if mutable instance is none this mean user is creating document directly from manager
# For example User.collection.create(name="Azeem") in this case mutable instance will be None
# If document is creating by directly using manager then check if there is any NestedModel
# If there is any nested model then get get value from nested model
if mutable_instance is None:
for k, v in kwargs.items():
try:
# if this is an id field then save it in field list and pass it
f = self.model_cls._meta.get_field(k)
except FieldNotFound:
field_list[k] = v
continue
if isinstance(f, NestedModel):
model_instance = v
if f.valid_model(model_instance):
field_list[f.name] = model_instance._get_fields()
else:
field_list[k] = v
# Create instance for nested model
for f in self.model_cls._meta.field_list.values():
if isinstance(f, NestedModel):
field_list[f.name] = f.nested_model()._get_fields()
else:
field_list = kwargs
return self.queryset.create(mutable_instance, transaction, batch, **field_list)
def _nested_field_list(self, f, fl, *name):
"""Get Nested Fields"""
required_nested_model = f.raw_attributes.get('required')
if required_nested_model is None or required_nested_model is False:
ignore_required = True
else:
ignore_required = False
nested_field_list = {}
for n_f in f.nested_model._meta.field_list.values():
if isinstance(n_f, NestedModel):
n = (*name, n_f.name)
self._nested_field_list(n_f, nested_field_list, *n)
else:
nv = None
if utils.get_nested(self.query, *name) is not None:
nv = utils.get_nested(self.query, *name).get(n_f.name)
nested_field_list[n_f.db_column_name] = n_f.get_value(
nv,
ignore_required
)
fl[f.db_column_name] = nested_field_list
user = User()
user.name = "Azeem"
user.age = 25
# if you call this method `_get_field()` it will return dict{name, val}
# in this case it will be
{name: "Azeem", age: 25}
Returns
-------
dict:
name value dict of model
"""
field_list = {}
for f in self._meta.field_list.values():
if isinstance(f, fields.NestedModel):
model_instance = getattr(self, f.name)
if f.valid_model(model_instance):
field_list[f.name] = model_instance._get_fields()
else:
field_list[f.name] = getattr(self, f.name)
return field_list
.. code-block:: python
class User(Model):
name = TextField(column_name="full_name")
age = NumberField()
User.collection.create(name="Azeem", age=25)
This method return dict of field names and values
in this case it will be like this
`{full_name: "Azeem", age=25}`
"""
field_dict = {}
for f in self.model._meta.field_list.values():
if f.name in self.query:
# Check if it is nested model
if isinstance(f, NestedModel):
# Get nested model field
self._nested_field_list(f, field_dict, f.name)
else:
v = f.get_value(self.query.get(f.name), ignore_required=True, ignore_default=True)
if v is not None or type(v) is bool:
field_dict[f.db_column_name] = v
if v is None and isinstance(f, DateTime):
field_dict[f.db_column_name] = v
return field_dict
return None
# instance values is changed according to firestore
# so mark it modified this will help later for figuring
# out the updated fields when need to update this document
setattr(model, '_instance_modified', True)
for k, v in doc_dict.items():
field = model._meta.get_field_by_column_name(k)
# if missing field setting is set to "ignore" then
# get_field_by_column_name return None So, just skip this field
if field is None:
continue
# Check if it is Reference field
if isinstance(field, ReferenceField):
val = ReferenceFieldWrapper.from_doc_ref(model, field, field.field_value(v))
elif isinstance(field, NestedModel):
nested_doc_val = field.field_value(v)
if nested_doc_val:
val = NestedModelWrapper.from_model_dict(field, nested_doc_val)
else:
val = None
else:
val = field.field_value(v)
setattr(model, field.name, val)
# If parent key is None but here is parent key from doc then set the parent for this model
# This is case when you filter the documents parent key not auto set just set it
if not model.parent and parent_key is not None:
model.parent = parent_key
# If it is not nested model then set the id for this model
if not nested_doc:
def _nested_field_list(self, f, fl, *name):
"""Get Nested Fields"""
nested_field_list = {}
for n_f in f.nested_model._meta.field_list.values():
if isinstance(n_f, NestedModel):
n = (*name, n_f.name)
self._nested_field_list(n_f, nested_field_list, *n)
else:
nested_field_list[n_f.db_column_name] = n_f.get_value(
utils.get_nested(self.query, *name).get(n_f.name),
ignore_required=True
)
fl[f.db_column_name] = nested_field_list
for k, v in kwargs.items():
try:
# if this is an id field then save it in field list and pass it
f = self.model_cls._meta.get_field(k)
except FieldNotFound:
field_list[k] = v
continue
if isinstance(f, NestedModel):
model_instance = v
if f.valid_model(model_instance):
field_list[f.name] = model_instance._get_fields()
else:
field_list[k] = v
# Create instance for nested model
for f in self.model_cls._meta.field_list.values():
if isinstance(f, NestedModel):
field_list[f.name] = f.nested_model()._get_fields()
else:
field_list = kwargs
return self.queryset.create(mutable_instance, transaction, batch, **field_list)
Examples
-------
.. code-block:: python
class User(Model):
name = TextField(column_name="full_name")
age = NumberField()
User.collection.create(name="Azeem", age=25)
This method return dict of field names and values
in this case it will be like this
`{full_name: "Azeem", age=25}`
"""
field_list = {}
for f in self.model._meta.field_list.values():
if isinstance(f, NestedModel):
self._nested_field_list(f, field_list, f.name)
else:
field_list[f.db_column_name] = f.get_value(self.query.get(f.name))
return field_list