Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
class Values(Term):
def __init__(
self, field,
):
super().__init__(None)
self.field = Field(field) if not isinstance(field, Field) else field
def get_sql(self, quote_char=None, **kwargs):
return "VALUES({value})".format(
value=self.field.get_sql(quote_char=quote_char, **kwargs)
)
class NullValue(Term):
def fields(self):
return []
def get_sql(self, **kwargs):
sql = "NULL"
return format_alias_sql(sql, self.alias, **kwargs)
class Criterion(Term):
def __and__(self, other):
return ComplexCriterion(Boolean.and_, self, other)
def __or__(self, other):
return ComplexCriterion(Boolean.or_, self, other)
def __xor__(self, other):
skip_to_python_if_native = False
def function_cast(self, term: Term) -> Term:
return functions.Cast(term, SqlTypes.NUMERIC)
Tortoise will then use the overridden attributes/functions for that dialect.
If you need a dynamic attribute, you can use a property.
"""
# Field_type is a readonly property for the instance, it is set by _FieldMeta
field_type: Type[Any] = None # type: ignore
indexable: bool = True
has_db_field: bool = True
skip_to_python_if_native: bool = False
allows_generated: bool = False
function_cast: Optional[Callable[[Term], Term]] = None
SQL_TYPE: str = None # type: ignore
GENERATED_SQL: str = None # type: ignore
# This method is just to make IDE/Linters happy
def __new__(cls, *args: Any, **kwargs: Any) -> "Field":
return super().__new__(cls)
def __init__(
self,
source_field: Optional[str] = None,
generated: bool = False,
pk: bool = False,
null: bool = False,
default: Any = None,
unique: bool = False,
index: bool = False,
namespace=format_quotes(
self.table.alias or self.table._table_name, quote_char
),
name=field_sql,
)
field_alias = getattr(self, "alias", None)
if with_alias:
return format_alias_sql(
field_sql, field_alias, quote_char=quote_char, **kwargs
)
return field_sql
class Index(Term):
def __init__(self, name, alias=None):
super(Index, self).__init__(alias)
self.name = name
def get_sql(self, quote_char=None, **kwargs):
return format_quotes(self.name, quote_char)
class Star(Field):
def __init__(self, table=None):
super(Star, self).__init__("*", table=table)
@property
def tables_(self):
if self.table is None:
return {}
def inner(inner_self, *args, **kwargs):
result = item_func(inner_self, *args, **kwargs)
if isinstance(result, (Term,)):
return Not(result)
return result
return BasicCriterion(JSONOperators.HAS_KEY, self, self.wrap_json(other))
def contains(self, other):
return BasicCriterion(JSONOperators.CONTAINS, self, self.wrap_json(other))
def contained_by(self, other):
return BasicCriterion(JSONOperators.CONTAINED_BY, self, self.wrap_json(other))
def has_keys(self, other: Iterable):
return BasicCriterion(JSONOperators.HAS_KEYS, self, Array(*other))
def has_any_keys(self, other: Iterable):
return BasicCriterion(JSONOperators.HAS_ANY_KEYS, self, Array(*other))
class Values(Term):
def __init__(
self, field,
):
super().__init__(None)
self.field = Field(field) if not isinstance(field, Field) else field
def get_sql(self, quote_char=None, **kwargs):
return "VALUES({value})".format(
value=self.field.get_sql(quote_char=quote_char, **kwargs)
)
class NullValue(Term):
def fields(self):
return []
value = self.value.replace(quote_char, quote_char * 2)
return format_quotes(value, quote_char)
if isinstance(self.value, bool):
return str.lower(str(self.value))
if self.value is None:
return "null"
return str(self.value)
def get_sql(self, quote_char=None, secondary_quote_char="'", **kwargs):
sql = self.get_value_sql(
quote_char=quote_char, secondary_quote_char=secondary_quote_char, **kwargs
)
return format_alias_sql(sql, self.alias, quote_char=quote_char, **kwargs)
class JSON(Term):
table = None
def __init__(self, value, alias=None):
super().__init__(alias)
self.value = value
def _recursive_get_sql(self, value, **kwargs):
if isinstance(value, dict):
return self._get_dict_sql(value, **kwargs)
if isinstance(value, list):
return self._get_list_sql(value, **kwargs)
if isinstance(value, str):
return self._get_str_sql(value, **kwargs)
return str(value)
def _get_dict_sql(self, value, **kwargs):
def __getitem__(self, item):
if not isinstance(item, slice):
raise TypeError("Field' object is not subscriptable")
return self.between(item.start, item.stop)
def __str__(self):
return self.get_sql(quote_char='"', secondary_quote_char="'")
def __hash__(self):
return hash(self.get_sql(with_alias=True))
def get_sql(self, **kwargs):
raise NotImplementedError()
class Parameter(Term):
is_aggregate = None
def __init__(self, placeholder):
super(Parameter, self).__init__()
self.placeholder = placeholder
def fields(self):
return []
def get_sql(self, **kwargs):
return str(self.placeholder)
class Negative(Term):
def __init__(self, term):
super(Negative, self).__init__()
if not field_object:
raise FieldError(f"Unknown keyword argument {key} for model {self.model}")
if field_object.pk:
raise IntegrityError(f"Field {key} is PK and can not be updated")
if isinstance(field_object, (ForeignKeyFieldInstance, OneToOneFieldInstance)):
fk_field: str = field_object.source_field # type: ignore
db_field = self.model._meta.fields_map[fk_field].source_field
value = executor.column_map[fk_field](
getattr(value, field_object.to_field_instance.model_field_name), None
)
else:
try:
db_field = self.model._meta.fields_db_projection[key]
except KeyError:
raise FieldError(f"Field {key} is virtual and can not be updated")
if isinstance(value, Term):
value = F.resolver_arithmetic_expression(self.model, value)[0]
elif isinstance(value, Function):
value = value.resolve(self.model, table)["field"]
else:
value = executor.column_map[key](value, None)
self.query = self.query.set(db_field, value)
param = model._meta.get_filter(f"{key}__isnull")
value = True
else:
param = model._meta.get_filter(key)
pk_db_field = model._meta.db_pk_column
if param.get("table"):
join = (
param["table"],
table[pk_db_field] == param["table"][param["backward_key"]],
)
if param.get("value_encoder"):
value = param["value_encoder"](value, model)
criterion = param["operator"](param["table"][param["field"]], value)
else:
if isinstance(value, Term):
encoded_value = value
else:
field_object = model._meta.fields_map[param["field"]]
encoded_value = (
param["value_encoder"](value, model, field_object)
if param.get("value_encoder")
else model._meta.db.executor_class._field_to_db(field_object, value, model)
)
criterion = param["operator"](table[param["source_field"]], encoded_value)
return criterion, join
clauses.append(
"{term} {orient}".format(term=term, orient=directionality.value)
if directionality is not None
else term
)
return " ORDER BY {orderby}".format(orderby=",".join(clauses))
def _offset_sql(self):
return " OFFSET {offset}".format(offset=self._offset)
def _limit_sql(self):
return " LIMIT {limit}".format(limit=self._limit)
class QueryBuilder(Selectable, Term):
"""
Query Builder is the main class in pypika which stores the state of a query and offers functions which allow the
state to be branched immutably.
"""
QUOTE_CHAR = '"'
SECONDARY_QUOTE_CHAR = "'"
ALIAS_QUOTE_CHAR = None
def __init__(self, dialect=None, wrap_union_queries=True, wrapper_cls=ValueWrapper):
super(QueryBuilder, self).__init__(None)
self._from = []
self._insert_table = None
self._update_table = None
self._delete_from = False