Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
items = [item] if item else []
else:
items = tableobj.query(filters, default_options=qo)
if count_only:
items = [len(items) if isinstance(items, list) else items.count()]
elif not isinstance(items, list):
if args_get("left", None):
raise SyntaxError("Set: no left join in appengine")
if args_get("groupby", None):
raise SyntaxError("Set: no groupby in appengine")
orderby = args_get("orderby", False)
if orderby:
if isinstance(orderby, (list, tuple)):
orderby = xorify(orderby)
if isinstance(orderby, Expression):
orderby = self.expand(orderby)
orders = orderby.split(", ")
tbl = tableobj
for order in orders:
order = str(order)
desc = order.startswith("-")
name = order[1 if desc else 0 :].split(".")[-1]
if name == "id":
o = -tbl._key if desc else tbl._key
else:
o = -getattr(tbl, name) if desc else getattr(tbl, name)
items = items.order(o)
if args_get("limitby", None):
(lmin, lmax) = attributes["limitby"]
limit = lmax - lmin
"STARTSWITH",
"ENDSWITH",
"ADD",
"SUB",
"MUL",
"DIV",
"MOD",
"AS",
"ON",
"COMMA",
"NOT_NULL",
"COALESCE",
"CONTAINS",
"BELONGS",
):
built = Expression(self.db, opm, left, right)
# expression as string
elif not (left or right):
built = Expression(self.db, op)
else:
raise SyntaxError("Operator not supported: %s" % op)
return built
def len(self):
return Expression(self.db, self._dialect.length, self, None, "integer")
def __sub__(self, other):
if self.type in ("integer", "bigint"):
result_type = "integer"
elif self.type in ["date", "time", "datetime", "double", "float"]:
result_type = "double"
elif self.type.startswith("decimal("):
result_type = self.type
else:
raise SyntaxError("subtraction operation not supported for type")
return Expression(self.db, self._dialect.sub, self, other, result_type)
def extract_decade(self, expr):
return Expression(expr.db, self.extract, expr, 'decade', 'integer')
def min(self):
return Expression(self.db, self._dialect.aggregate, self, "MIN", self.type)
def expand_all(self, fields, tabledict):
new_fields = []
append = new_fields.append
for item in fields:
if isinstance(item, SQLALL):
new_fields += item._table
elif isinstance(item, str):
m = REGEX_TABLE_DOT_FIELD.match(item)
if m:
tablename, fieldname = m.groups()
append(self.db[tablename][fieldname])
else:
append(Expression(self.db, lambda item=item: item))
else:
append(item)
# ## if no fields specified take them all from the requested tables
if not new_fields:
for table in tabledict.values():
for field in table:
append(field)
return new_fields
def seconds(self):
return Expression(self.db, self._dialect.extract, self, "second", "integer")
elif op == "LT":
built = left < right
elif op == "LE":
built = left <= right
elif op in ("JOIN", "LEFT_JOIN", "RANDOM", "ALLOW_NULL"):
built = Expression(self.db, opm)
elif op in (
"LOWER",
"UPPER",
"EPOCH",
"PRIMARY_KEY",
"COALESCE_ZERO",
"RAW",
"INVERT",
):
built = Expression(self.db, opm, left)
elif op in (
"COUNT",
"EXTRACT",
"AGGREGATE",
"SUBSTRING",
"REGEXP",
"LIKE",
"ILIKE",
"STARTSWITH",
"ENDSWITH",
"ADD",
"SUB",
"MUL",
"DIV",
"MOD",
"AS",
def ilike(self, first, second, escape=None, query_env={}):
if isinstance(second, Expression):
second = self.expand(second, 'string', query_env=query_env)
else:
second = self.expand(second, 'string', query_env=query_env).lower()
if escape is None:
escape = '\\'
second = second.replace(escape, escape * 2)
if second.startswith("n'"):
second = "N'" + second[2:]
return "(%s LIKE %s ESCAPE '%s')" % (
self.lower(first, query_env), second, escape)