Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_select__column__single__field(self):
t = Table('abc')
q = Query.from_(t).select(t.foo)
self.assertEqual('SELECT foo FROM abc', str(q))
q2 = Q.from_(self.t).select(fn.Cast(self.t.foo, 'DATE'))
self.assertEqual("SELECT DATE(foo) FROM abc", str(q1))
self.assertEqual("SELECT CAST(foo AS DATE) FROM abc", str(q2))
def test__cast__timestamp(self):
q1 = Q.from_(self.t).select(fn.Timestamp(self.t.foo))
q2 = Q.from_(self.t).select(fn.Cast(self.t.foo, 'TIMESTAMP'))
self.assertEqual("SELECT TIMESTAMP(foo) FROM abc", str(q1))
self.assertEqual("SELECT CAST(foo AS TIMESTAMP) FROM abc", str(q2))
class DateFunctionsTests(unittest.TestCase):
dt = F('dt')
t = T('abc')
def test_add_microsecond(self):
c = self.dt + Interval(microseconds=1)
self.assertEqual("dt+INTERVAL 1 MICROSECOND", str(c))
def test_add_second(self):
c = self.dt + Interval(seconds=1)
self.assertEqual("dt+INTERVAL 1 SECOND", str(c))
def test_add_minute(self):
c = self.dt + Interval(minutes=1)
self.assertEqual("dt+INTERVAL 1 MINUTE", str(c))
c1 = Field('foo') >= datetime(2000, 1, 1, 12, 30, 55)
c2 = Field('foo', table=self.t).gte(datetime(2000, 1, 1, 12, 30, 55))
self.assertEqual("foo>='2000-01-01T12:30:55'", str(c1))
self.assertEqual("t0.foo>='2000-01-01T12:30:55'", str(c2))
def test__criterion_gte_right(self):
c1 = 1 <= Field('foo')
c2 = -1 <= Field('foo', table=self.t)
self.assertEqual('foo>=1', str(c1))
self.assertEqual('t0.foo>=-1', str(c2))
class BetweenTests(unittest.TestCase):
t = Table('abc')
t.alias = 't0'
def test__between_number(self):
c1 = Field('foo').between(0, 1)
c2 = Field('foo', table=self.t).between(0, 1)
c3 = Field('foo')[0:1]
self.assertEqual('foo BETWEEN 0 AND 1', str(c1))
self.assertEqual('t0.foo BETWEEN 0 AND 1', str(c2))
self.assertEqual('foo BETWEEN 0 AND 1', str(c3))
def test__between_date(self):
c1 = Field('foo').between(date(2000, 1, 1), date(2000, 12, 31))
c2 = Field('foo', table=self.t).between(date(2000, 1, 1), date(2000, 12, 31))
c3 = Field('foo')[date(2000, 1, 1):date(2000, 12, 31)]
def get_column_definitions(self, schema, table, connection=None):
columns = Table('columns', schema='INFORMATION_SCHEMA')
columns_query = PostgreSQLQuery.from_(columns) \
.select(columns.column_name, columns.data_type) \
.where(columns.table_schema == schema) \
.where(columns.field('table_name') == table) \
.distinct() \
.orderby(columns.column_name)
return self.fetch(str(columns_query), connection=connection)
def __init__(self, model: Type[MODEL]) -> None:
self._joined_tables: List[Table] = []
self.model: "Type[Model]" = model
self.query: QueryBuilder = QUERY
self._db: BaseDBAsyncClient = None # type: ignore
self.capabilities: Capabilities = model._meta.db.capabilities
self._annotations: Dict[str, Function] = {}
def _get_joins_for_related_field(
table: Table, related_field: RelationalField, related_field_name: str
) -> List[Tuple[Table, Criterion]]:
required_joins = []
related_table: Table = related_field.related_model._meta.basetable
if isinstance(related_field, ManyToManyFieldInstance):
through_table = Table(related_field.through)
required_joins.append(
(
through_table,
table[related_field.model._meta.db_pk_column]
== through_table[related_field.backward_key],
)
)
required_joins.append(
(
related_table,
through_table[related_field.forward_key]
== related_table[related_field.related_model._meta.db_pk_column],
)
)
elif isinstance(related_field, BackwardFKRelation):
to_field_source_field = (
async def _prefetch_m2m_relation(
self, instance_list: "List[Model]", field: str, related_query: "QuerySet"
) -> list:
instance_id_set: set = {
self._field_to_db(instance._meta.pk, instance.pk, instance)
for instance in instance_list
}
field_object: ManyToManyFieldInstance = self.model._meta.fields_map[ # type: ignore
field
]
through_table = Table(field_object.through)
subquery = (
self.db.query_class.from_(through_table)
.select(
through_table[field_object.backward_key].as_("_backward_relation_key"),
through_table[field_object.forward_key].as_("_forward_relation_key"),
)
.where(through_table[field_object.backward_key].isin(instance_id_set))
)
related_query_table = related_query.model._meta.basetable
related_pk_field = related_query.model._meta.db_pk_field
related_query.resolve_ordering(related_query.model, related_query_table, [], {})
query = (
related_query.query.join(subquery)
.on(subquery._forward_relation_key == related_query_table[related_pk_field])
def _build_initial_querysets(cls) -> None:
for app in cls.apps.values():
for model in app.values():
model._meta.finalise_model()
model._meta.basetable = Table(model._meta.db_table)
model._meta.basequery = model._meta.db.query_class.from_(model._meta.db_table)
model._meta.basequery_all_fields = model._meta.basequery.select(
*model._meta.db_fields
)