Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def count(self, query, distinct=None, snapshot=True):
if not isinstance(query, Query):
raise SyntaxError("Type '%s' not supported in count" % type(query))
distinct_fields = []
if distinct is True:
distinct_fields = [x for x in query.first.table if x.name != 'id']
elif distinct:
if isinstance(distinct, Field):
distinct_fields = [distinct]
else:
while (isinstance(distinct, Expression) and
isinstance(distinct.second, Field)):
distinct_fields += [distinct.second]
distinct = distinct.first
if isinstance(distinct, Field):
distinct_fields += [distinct]
distinct = True
expanded = Expansion(
self, 'count', query, fields=distinct_fields, distinct=distinct)
ctable = expanded.get_collection()
if not expanded.pipeline:
return ctable.count(filter=expanded.query_dict)
for record in ctable.aggregate(expanded.pipeline):
return record['count']
Field('email', length=512, default='',
label=self.messages.label_email,
requires=is_unique_email),
Field('username', length=128, default='',
label=self.messages.label_username,
requires=is_unique_username),
Field(passfield, 'password', length=512,
readable=False, label=self.messages.label_password,
requires=[hasLength(minsize=settings.password_min_length), is_crypted]),
Field('registration_key', length=512,
writable=False, readable=False, default='',
label=self.messages.label_registration_key),
Field('reset_password_key', length=512,
writable=False, readable=False, default='',
label=self.messages.label_reset_password_key),
Field('registration_id', length=512,
writable=False, readable=False, default='',
label=self.messages.label_registration_id),
*extra_fields,
**dict(
migrate=self.__get_migrate(settings.table_user_name,
migrate),
fake_migrate=fake_migrate,
format='%(username)s'))
else:
db.define_table(
settings.table_user_name,
Field('first_name', length=128, default='',
label=self.messages.label_first_name,
requires=is_not_empty),
Field('last_name', length=128, default='',
label=self.messages.label_last_name,
self.get_mailboxes()
names = self.connection.mailbox_names.keys()
for name in names:
self.db.define_table("%s" % name,
Field("uid", writable=False),
Field("created", "datetime", writable=False),
Field("content", "text", writable=False),
Field("to", writable=False),
Field("cc", writable=False),
Field("bcc", writable=False),
Field("sender", writable=False),
Field("size", "integer", writable=False),
Field("subject", writable=False),
Field("mime", writable=False),
Field("email", "text", writable=False, readable=False),
Field("attachments", "text", writable=False, readable=False),
Field("encoding", writable=False),
Field("answered", "boolean"),
Field("deleted", "boolean"),
Field("draft", "boolean"),
Field("flagged", "boolean"),
Field("recent", "boolean", writable=False),
Field("seen", "boolean")
)
# Set a special _mailbox attribute for storing
# native mailbox names
self.db[name].mailbox = \
self.connection.mailbox_names[name]
def many_query(ref, rid):
if ref.cast and isinstance(rid, _Field):
rid = rid.cast(ref.cast)
return ref.model_instance.table[ref.field] == rid
def __call__(self, query=None, ignore_common_filters=None):
if isinstance(query, Table):
query = self._adapter.id_query(query)
elif isinstance(query, Field):
query = query!=None
elif isinstance(query, dict):
icf = query.get("ignore_common_filters")
if icf: ignore_common_filters = icf
return Set(self, query, ignore_common_filters=ignore_common_filters)
label=self.messages.label_description),
*extra_fields,
**dict(
migrate=self.__get_migrate(
settings.table_group_name, migrate),
fake_migrate=fake_migrate,
format='%(role)s (%(id)s)'))
reference_table_group = 'reference %s' % settings.table_group_name
if not settings.table_membership_name in db.tables:
extra_fields = settings.extra_fields.get(
settings.table_membership_name, []) + signature_list
db.define_table(
settings.table_membership_name,
Field('user_id', reference_table_user,
label=self.messages.label_user_id),
Field('group_id', reference_table_group,
label=self.messages.label_group_id),
*extra_fields,
**dict(
migrate=self.__get_migrate(
settings.table_membership_name, migrate),
fake_migrate=fake_migrate))
if not settings.table_permission_name in db.tables:
extra_fields = settings.extra_fields.get(
settings.table_permission_name, []) + signature_list
db.define_table(
settings.table_permission_name,
Field('group_id', reference_table_group,
label=self.messages.label_group_id),
Field('name', default='default', length=512,
label=self.messages.label_name,
requires=is_not_empty),
else:
self.static_names = None
if not isinstance(self.connection.mailbox_names, dict):
self.get_mailboxes()
names = self.connection.mailbox_names.keys()
for name in names:
self.db.define_table("%s" % name,
Field("uid", writable=False),
Field("created", "datetime", writable=False),
Field("content", "text", writable=False),
Field("to", writable=False),
Field("cc", writable=False),
Field("bcc", writable=False),
Field("sender", writable=False),
Field("size", "integer", writable=False),
Field("subject", writable=False),
Field("mime", writable=False),
Field("email", "text", writable=False, readable=False),
Field("attachments", "text", writable=False, readable=False),
Field("encoding", writable=False),
Field("answered", "boolean"),
Field("deleted", "boolean"),
Field("draft", "boolean"),
Field("flagged", "boolean"),
Field("recent", "boolean", writable=False),
Field("seen", "boolean")
)
# Set a special _mailbox attribute for storing
# native mailbox names
self.signature = Table(
self.db, 'auth_signature',
Field('is_active', 'boolean',
default=True,
readable=False, writable=False,
label='Is Active'),
Field('created_on', 'datetime',
default=lambda: datetime.now(),
writable=False, readable=False,
label='Created On'),
Field('created_by',
reference_user,
default=lazy_user, represent=represent,
writable=False, readable=False,
label='Created By', ondelete=ondelete),
Field('modified_on', 'datetime',
update=lambda: datetime.now(),
default=lambda: datetime.now(),
writable=False, readable=False,
label='Modified On'),
Field('modified_by',
reference_user, represent=represent,
default=lazy_user, update=lazy_user,
writable=False, readable=False,
label='Modified By', ondelete=ondelete))
for name in names:
self.db.define_table("%s" % name,
Field("uid", writable=False),
Field("created", "datetime", writable=False),
Field("content", "text", writable=False),
Field("to", writable=False),
Field("cc", writable=False),
Field("bcc", writable=False),
Field("sender", writable=False),
Field("size", "integer", writable=False),
Field("subject", writable=False),
Field("mime", writable=False),
Field("email", "text", writable=False, readable=False),
Field("attachments", "text", writable=False, readable=False),
Field("encoding", writable=False),
Field("answered", "boolean"),
Field("deleted", "boolean"),
Field("draft", "boolean"),
Field("flagged", "boolean"),
Field("recent", "boolean", writable=False),
Field("seen", "boolean")
)
# Set a special _mailbox attribute for storing
# native mailbox names
self.db[name].mailbox = \
self.connection.mailbox_names[name]
# decode quoted printable
self.db[name].to.represent = self.db[name].cc.represent = \
self.db[name].bcc.represent = self.db[name].sender.represent = \
self.db[name].subject.represent = self.header_represent
settings.table_membership_name,
Field('user_id', reference_table_user,
label=self.messages.label_user_id),
Field('group_id', reference_table_group,
label=self.messages.label_group_id),
*extra_fields,
**dict(
migrate=self._get_migrate(
settings.table_membership_name, migrate),
fake_migrate=fake_migrate))
if settings.table_permission_name not in db.tables:
extra_fields = settings.extra_fields.get(
settings.table_permission_name, []) + signature_list
db.define_table(
settings.table_permission_name,
Field('group_id', reference_table_group,
label=self.messages.label_group_id),
Field('name', default='default', length=512,
label=self.messages.label_name,
requires=is_not_empty),
Field('table_name', length=512,
label=self.messages.label_table_name),
Field('record_id', 'integer', default=0,
label=self.messages.label_record_id,
requires=IS_INT_IN_RANGE(0, 10 ** 9)),
*extra_fields,
**dict(
migrate=self._get_migrate(
settings.table_permission_name, migrate),
fake_migrate=fake_migrate))
if settings.table_event_name not in db.tables:
db.define_table(