Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def testRun(self):
db = DAL(DEFAULT_URI, check_reserved=['all'])
db.define_table('person', Field('name', default="Michael"),Field('uuid'))
db.define_table('pet',Field('friend',db.person),Field('name'))
dbdict = db.as_dict(flat=True, sanitize=False)
assert isinstance(dbdict, dict)
uri = dbdict["uri"]
assert isinstance(uri, basestring) and uri
assert len(dbdict["tables"]) == 2
assert len(dbdict["tables"][0]["fields"]) == 3
assert dbdict["tables"][0]["fields"][1]["type"] == db.person.name.type
assert dbdict["tables"][0]["fields"][1]["default"] == db.person.name.default
db2 = DAL(**dbdict)
assert len(db.tables) == len(db2.tables)
assert hasattr(db2, "pet") and isinstance(db2.pet, Table)
assert hasattr(db2.pet, "friend") and isinstance(db2.pet.friend, Field)
drop(db.pet)
db.commit()
db2.commit()
have_serializers = True
try:
import serializers
dbjson = db.as_json(sanitize=False)
assert isinstance(dbjson, basestring) and len(dbjson) > 0
unicode_keys = True
if sys.version < "2.6.5":
assert not (str(db5) in ("", None))
dbdict6 = {
"uri": DEFAULT_URI,
"tables": [
{"tablename": "staff"},
{
"tablename": "tvshow",
"fields": [
{"fieldname": "name"},
{"fieldname": "rating", "type": "double"},
],
},
],
}
db6 = DAL(**dbdict6)
assert len(db6["staff"].fields) == 1
assert "name" in db6["tvshow"].fields
assert db6.staff.insert() is not None
assert db6(db6.staff).select().first().id == 1
db6.staff.drop()
db6.tvshow.drop()
db6.commit()
db.close()
db2.close()
db4.close()
db5.close()
db6.close()
def testRun(self):
# check connection is no longer active after close
db = DAL(DEFAULT_URI, check_reserved=["all"])
connection = db._adapter.connection
db.close()
self.assertRaises(Exception, connection.commit)
# check connection are reused with pool_size
connections = set()
for a in range(10):
db2 = DAL(DEFAULT_URI, check_reserved=["all"], pool_size=5)
c = db2._adapter.connection
connections.add(c)
db2.close()
self.assertEqual(len(connections), 1)
c = connections.pop()
c.commit()
c.close()
# check correct use of pool_size
def testRun(self):
db = DAL(DEFAULT_URI, check_reserved=['all'])
db.define_table('tt', Field('aa', 'reference'))
with self.assertRaises(ValueError):
db.tt.insert(aa='x')
with self.assertRaises(ValueError):
db.tt.insert(aa='_')
with self.assertRaises(TypeError):
db.tt.insert(aa=3.1)
self.assertEqual(isinstance(db.tt.insert(aa=''), long), True)
self.assertEqual(isinstance(db.tt.insert(aa='1'), long), True)
self.assertEqual(isinstance(db.tt.insert(aa='0x1'), long), True)
with self.assertRaises(RuntimeError):
db(db.tt.aa+1==1).update(aa=0)
drop(db.tt)
db.define_table('tt', Field('aa', 'date'))
self.assertEqual(isinstance(db.tt.insert(aa=None), long), True)
expanded = Expansion(db._adapter, 'count',
Query(db, db._adapter.dialect.eq, db.tt.aa, 'x'), [])
self.assertEqual(db._adapter.expand(expanded).query_dict, {'aa': 'x'})
if db._adapter.server_version_major >= 2.6:
with self.assertRaises(RuntimeError):
db(db.tt).update(id=1)
else:
db(db.tt).update(id=1)
self.assertNotEqual(db(db.tt.aa=='aa').select(db.tt.id).response[0][0], 1)
drop(db.tt)
db.close()
for safe in [False, True, False]:
db = DAL(DEFAULT_URI, check_reserved=['all'])
db.define_table('tt', Field('aa'))
self.assertEqual(isinstance(db.tt.insert(aa='x'), long), True)
with self.assertRaises(RuntimeError):
db._adapter.delete(db['tt'], 'x', safe=safe)
self.assertEqual(db._adapter.delete(
db['tt'], Query(db, db._adapter.dialect.eq, db.tt.aa, 'x'), safe=safe), 1)
self.assertEqual(db(db.tt.aa=='x').count(), 0)
self.assertEqual(
db._adapter.update(
db['tt'],
Query(db, db._adapter.dialect.eq, db.tt.aa, 'x'),
db['tt']._fields_and_values_for_update(
{'aa':'x'}).op_values(),
safe=safe
), 0)
drop(db.tt)
def testUpdate(self):
db = DAL(DEFAULT_URI, check_reserved=['all'])
# some db's only support seconds
datetime_datetime_today = datetime.datetime.today()
datetime_datetime_today = datetime_datetime_today.replace(
microsecond = 0)
one_day = datetime.timedelta(1)
one_sec = datetime.timedelta(0,1)
update_vals = (
('string', 'x', 'y'),
('text', 'x', 'y'),
('password', 'x', 'y'),
('integer', 1, 2),
('bigint', 1, 2),
('float', 1.0, 2.0),
('double', 1.0, 2.0),
def setUpModule():
if not IS_IMAP:
db = DAL(DEFAULT_URI, check_reserved=['all'])
def clean_table(db, tablename):
try:
db.define_table(tablename)
except Exception as e:
pass
try:
drop(db[tablename])
except Exception as e:
pass
for tablename in ['tt', 't0', 't1', 't2', 't3', 't4',
'easy_name', 'tt_archive', 'pet_farm', 'person']:
clean_table(db, tablename)
db.close()
def testRun(self):
db = DAL(DEFAULT_URI, check_reserved=["all"])
db.define_table("tt", Field("aa"))
rv = db.tt.create_index("idx_aa", db.tt.aa)
self.assertTrue(rv)
rv = db.tt.drop_index("idx_aa")
self.assertTrue(rv)
with self.assertRaises(Exception):
db.tt.drop_index("idx_aa")
db.rollback()
drop(db.tt)
from pydal import DAL, Field
from datetime import datetime
db = DAL('sqlite://download.db')
market = db.define_table(
'market',
Field('name'),
Field('ask', type='double'),
Field('timestamp', type='datetime', default=datetime.now)
)
db.executesql('CREATE INDEX IF NOT EXISTS tidx ON market (timestamp);')
db.executesql('CREATE INDEX IF NOT EXISTS m_n_idx ON market (name);')
buy = db.define_table(
'buy',
Field('market'),
Field('purchase_price', type='double'),
Field('selling_price', type='double'),
Field('amount', type='double'),
def __new__(self,):
dalString = 'mongodb://mongodb/ipChecker'
db = DAL(dalString)
db.define_table('ips',
Field('ip'),
Field('created_at',default=datetime.now().strftime("%d/%m/%Y")),
Field('cloudflare','boolean',default=False))
db.define_table('statistics',
Field('ip'),
Field('status'),
Field('created_on','datetime', default=datetime.now))
return db