Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def testSelect(self):
db = DAL(DEFAULT_URI, check_reserved=['all'])
db.define_table(
'a_table',
Field('b_field'),
Field('a_field'),
)
db.a_table.insert(a_field="aa1", b_field="bb1")
rtn = db(db.a_table).select(db.a_table.id, db.a_table.b_field, db.a_table.a_field).as_list()
self.assertEqual(rtn[0]['b_field'], 'bb1')
keys = rtn[0].keys()
self.assertEqual(len(keys), 3)
self.assertEqual(("id" in keys, "b_field" in keys, "a_field" in keys), (True, True, True))
drop(db.a_table)
db.close()
def testJoin(self):
db = self.connect()
rname = "this_is_field_aa"
rname2 = "this_is_field_b"
db.define_table("t1", Field("aa", rname=rname))
db.define_table("t2", Field("aa", rname=rname), Field("b", db.t1, rname=rname2))
i1 = db.t1.insert(aa="1")
i2 = db.t1.insert(aa="2")
i3 = db.t1.insert(aa="3")
db.t2.insert(aa="4", b=i1)
db.t2.insert(aa="5", b=i2)
db.t2.insert(aa="6", b=i2)
self.assertEqual(
len(db(db.t1.id == db.t2.b).select(orderby=db.t1.aa | db.t2.aa)), 3
)
self.assertEqual(
db(db.t1.id == db.t2.b).select(orderby=db.t1.aa | db.t2.aa)[2].t1.aa, "2"
)
self.assertEqual(
db(db.t1.id == db.t2.b).select(orderby=db.t1.aa | db.t2.aa)[2].t2.aa, "6"
)
self.assertEqual(
def testListReference(self):
db = self.connect()
db.define_table("t0", Field("aa", "string"))
db.define_table("tt", Field("t0_id", "list:reference t0"))
id_a1 = db.t0.insert(aa="test1")
id_a2 = db.t0.insert(aa="test2")
ref1 = [id_a1]
ref2 = [id_a2]
ref3 = [id_a1, id_a2]
db.tt.insert(t0_id=ref1)
self.assertEqual(db(db.tt).select(db.tt.t0_id).last()[db.tt.t0_id], ref1)
db.tt.insert(t0_id=ref2)
self.assertEqual(db(db.tt).select(db.tt.t0_id).last()[db.tt.t0_id], ref2)
db.tt.insert(t0_id=ref3)
self.assertEqual(db(db.tt).select(db.tt.t0_id).last()[db.tt.t0_id], ref3)
self.assertEqual(db(db.tt.t0_id == ref3).count(), 1)
def testRowExtra(self):
db = self.connect(check_reserved=None, lazy_tables=True)
tt = db.define_table("tt", Field("value", "integer"))
db.tt.insert(value=1)
row = db(db.tt).select("value").first()
self.assertEqual(row.value, 1)
def testRun(self):
db = DAL(None)
db.define_table("no_table", Field("aa"))
self.assertIsInstance(db.no_table.aa, Field)
self.assertIsInstance(db.no_table["aa"], Field)
db.close()
rtn = db(db.easy_name.a_field == "c").count()
self.assertEqual(rtn, 1)
rtn = db(db.easy_name.a_field != "c").count()
self.assertEqual(rtn, 1)
avg = db.easy_name.id.avg()
rtn = db(db.easy_name.id > 0).select(avg)
self.assertEqual(rtn[0][avg], 3)
avg = db.easy_name.rating.avg()
rtn = db(db.easy_name.id > 0).select(avg)
self.assertEqual(rtn[0][avg], 2)
rname = "this_is_the_person_name"
db.define_table(
"person",
Field("id", type="id", rname="fooid"),
Field("name", default="Michael", rname=rname),
Field("uuid"),
)
rname = "this_is_the_pet_name"
db.define_table(
"pet", Field("friend", "reference person"), Field("name", rname=rname)
)
michael = db.person.insert() # default insert
john = db.person.insert(name="John")
luke = db.person.insert(name="Luke")
# michael owns Phippo
phippo = db.pet.insert(friend=michael, name="Phippo")
# john owns Dunstin and Gertie
dunstin = db.pet.insert(friend=john, name="Dunstin")
gertie = db.pet.insert(friend=john, name="Gertie")
def define_tables(self, db, migrate):
"""Define Scheduler tables structure."""
from pydal.base import DEFAULT
logger.debug('defining tables (migrate=%s)', migrate)
now = self.now
db.define_table(
'scheduler_task',
Field('application_name', requires=IS_NOT_EMPTY(),
default=None, writable=False),
Field('task_name', default=None),
Field('group_name', default='main'),
Field('status', requires=IS_IN_SET(TASK_STATUS),
default=QUEUED, writable=False),
Field('broadcast', 'boolean', default=False),
Field('function_name',
requires=IS_IN_SET(sorted(self.tasks.keys()))
if self.tasks else DEFAULT),
Field('uuid', length=255,
requires=IS_NOT_IN_DB(db, 'scheduler_task.uuid'),
unique=True, default=uuid4),
Field('args', 'text', default='[]', requires=TYPE(list)),
Field('vars', 'text', default='{}', requires=TYPE(dict)),
Field('enabled', 'boolean', default=True),
Field('start_time', 'datetime', default=now,
requires=IS_DATETIME()),
Field('next_run_time', 'datetime', default=now),
Field('stop_time', 'datetime'),
Field('repeats', 'integer', default=1, comment="0=unlimited",
requires=IS_INT_IN_RANGE(0, None)),
Field('retry_failed', 'integer', default=0, comment="-1=unlimited",
requires=IS_INT_IN_RANGE(0, None)),
Field('times_run', 'integer', default=0, writable=False),
Field('times_failed', 'integer', default=0, writable=False),
Field('last_run_time', 'datetime', writable=False, readable=False),
Field('assigned_worker_name', default='', writable=False),
on_define=self.set_requirements,
migrate=self.__get_migrate('scheduler_task', migrate),
format='(%(id)s) %(task_name)s')
db.define_table(
'scheduler_run',
Field('task_id', 'reference scheduler_task'),
Field('status', requires=IS_IN_SET(RUN_STATUS)),
Field('start_time', 'datetime'),
Field('stop_time', 'datetime'),
Field('run_output', 'text'),
Field('run_result', 'text'),
Field('traceback', 'text'),
Field('worker_name', default=self.worker_name),
migrate=self.__get_migrate('scheduler_run', migrate)
)
db.define_table(
'scheduler_worker',
Field('worker_name', length=255, unique=True),
Field('first_heartbeat', 'datetime'),
Field('last_heartbeat', 'datetime'),
Field('status', requires=IS_IN_SET(WORKER_STATUS)),
Field('is_ticker', 'boolean', default=False, writable=False),
Field('group_names', 'list:string', default=self.group_names),
Field('worker_stats', 'json'),
migrate=self.__get_migrate('scheduler_worker', migrate)
Field('times_failed', 'integer', default=0, writable=False),
Field('last_run_time', 'datetime', writable=False, readable=False),
Field('assigned_worker_name', default='', writable=False),
on_define=self.set_requirements,
migrate=self.__get_migrate('scheduler_task', migrate),
format='(%(id)s) %(task_name)s')
db.define_table(
'scheduler_run',
Field('task_id', 'reference scheduler_task'),
Field('status', requires=IS_IN_SET(RUN_STATUS)),
Field('start_time', 'datetime'),
Field('stop_time', 'datetime'),
Field('run_output', 'text'),
Field('run_result', 'text'),
Field('traceback', 'text'),
Field('worker_name', default=self.worker_name),
migrate=self.__get_migrate('scheduler_run', migrate)
)
db.define_table(
'scheduler_worker',
Field('worker_name', length=255, unique=True),
Field('first_heartbeat', 'datetime'),
Field('last_heartbeat', 'datetime'),
Field('status', requires=IS_IN_SET(WORKER_STATUS)),
Field('is_ticker', 'boolean', default=False, writable=False),
Field('group_names', 'list:string', default=self.group_names),
Field('worker_stats', 'json'),
migrate=self.__get_migrate('scheduler_worker', migrate)
)
requires=IS_INT_IN_RANGE(0, None)),
Field('retry_failed', 'integer', default=0, comment="-1=unlimited",
requires=IS_INT_IN_RANGE(-1, None)),
Field('period', 'integer', default=60, comment='seconds',
requires=IS_INT_IN_RANGE(0, None)),
Field('prevent_drift', 'boolean', default=False,
comment='Exact start_times between runs'),
Field('cronline', default=None,
comment='Discard "period", use this cron expr instead',
requires=IS_EMPTY_OR(IS_CRONLINE())),
Field('timeout', 'integer', default=60, comment='seconds',
requires=IS_INT_IN_RANGE(1, None)),
Field('sync_output', 'integer', default=0,
comment="update output every n sec: 0=never",
requires=IS_INT_IN_RANGE(0, None)),
Field('times_run', 'integer', default=0, writable=False),
Field('times_failed', 'integer', default=0, writable=False),
Field('last_run_time', 'datetime', writable=False, readable=False),
Field('assigned_worker_name', default='', writable=False),
on_define=self.set_requirements,
migrate=self.__get_migrate('scheduler_task', migrate),
format='(%(id)s) %(task_name)s')
db.define_table(
'scheduler_run',
Field('task_id', 'reference scheduler_task'),
Field('status', requires=IS_IN_SET(RUN_STATUS)),
Field('start_time', 'datetime'),
Field('stop_time', 'datetime'),
Field('run_output', 'text'),
Field('run_result', 'text'),
Field('traceback', 'text'),