Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_as_scalar(self):
with testing.expect_deprecated(
r"The SelectBase.as_scalar\(\) method is deprecated and "
"will be removed in a future release."
):
stmt = select([self.table1.c.myid]).as_scalar()
is_true(stmt.compare(select([self.table1.c.myid]).scalar_subquery()))
def _test_criterion(self, engine):
data_table = self.tables.data_table
result = engine.execute(
select([data_table.c.data]).where(
data_table.c.data["k1"].astext == "r3v1"
)
).first()
eq_(result, ({"k1": "r3v1", "k2": "r3v2"},))
result = engine.execute(
select([data_table.c.data]).where(
data_table.c.data["k1"].astext.cast(String) == "r3v1"
)
).first()
eq_(result, ({"k1": "r3v1", "k2": "r3v2"},))
"""
table1 = table("table1", column("x"))
table2 = table("table2", column("y"))
a1 = table1.alias()
s = select([a1.c.x]).select_from(a1.join(table2, a1.c.x == table2.c.y))
assert_s = select([select([s.subquery()]).subquery()])
for fn in (
sql_util._deep_deannotate,
lambda s: sql_util._deep_annotate(s, {"foo": "bar"}),
lambda s: visitors.cloned_traverse(s, {}, {}),
lambda s: visitors.replacement_traverse(s, {}, lambda x: None),
):
sel = fn(select([fn(select([fn(s.subquery())]).subquery())]))
eq_(str(assert_s), str(sel))
def test_typedec_operator_adapt(self):
expr = test_table.c.bvalue + "hi"
assert expr.type.__class__ is MyTypeDec
assert expr.right.type.__class__ is MyTypeDec
eq_(
testing.db.execute(select([expr.label("foo")])).scalar(),
"BIND_INfooBIND_INhiBIND_OUT",
)
'builds.id',
'buildsets.id',
'changes.changeid',
'patches.id',
'sourcestampsets.id',
'sourcestamps.id',
'objects.id',
'users.uid',
]
for col in to_fix:
tbl_name, col_name = col.split('.')
tbl = sautils.Table(tbl_name, metadata, autoload=True)
col = tbl.c[col_name]
res = migrate_engine.execute(sa.select([sa.func.max(col)]))
max = res.fetchall()[0][0]
if max:
seq_name = "%s_%s_seq" % (tbl_name, col_name)
r = migrate_engine.execute("SELECT setval('%s', %d)"
% (seq_name, max))
r.close()
def __init__(self, slavename):
self.slavename = slavename
# slave info, including template
q = sqlalchemy.select(
[model.slaves, model.tac_templates.c.template],
whereclause=(model.slaves.c.name == slavename),
from_obj = [
model.slaves.outerjoin(
model.tac_templates,
onclause=(
model.slaves.c.custom_tplid == model.tac_templates.c.tplid))])
slave_row = q.execute().fetchone()
if not slave_row:
raise exceptions.NoAllocationError
self.slaveid = slave_row.slaveid
self.enabled = slave_row.enabled
self.slave_basedir = slave_row.basedir
# bail out early if this slave is not enabled
"""return the new rewards which are given for the achievement level."""
this_level = DBSession.execute(select([t_rewards.c.id.label("reward_id"),
t_achievements_rewards.c.id,
t_rewards.c.name,
t_achievements_rewards.c.from_level,
t_achievements_rewards.c.value,
t_achievements_rewards.c.value_translation_id],
from_obj=t_rewards.join(t_achievements_rewards))\
.where(and_(or_(t_achievements_rewards.c.from_level <= level,
t_achievements_rewards.c.from_level == None),
t_achievements_rewards.c.achievement_id == achievement_id))\
.order_by(t_achievements_rewards.c.from_level))\
.fetchall()
prev_level = DBSession.execute(select([t_rewards.c.id.label("reward_id"),
t_achievements_rewards.c.id,
t_achievements_rewards.c.value,
t_achievements_rewards.c.value_translation_id],
from_obj=t_rewards.join(t_achievements_rewards))\
.where(and_(or_(t_achievements_rewards.c.from_level <= level-1,
t_achievements_rewards.c.from_level == None),
t_achievements_rewards.c.achievement_id == achievement_id))\
.order_by(t_achievements_rewards.c.from_level))\
.fetchall()
#now compute the diff :-/
build_hash = lambda x, l: hashlib.md5((str(x["id"])+str(evaluate_string(x["value"], {"level": l}))+str(Translation.trs(x["value_translation_id"], {"level": l}))).encode("UTF-8")).hexdigest()
prev_hashes = {build_hash(x, level-1) for x in prev_level}
#this_hashes = {build_hash(x,level) for x in this_level}
retlist = [x for x in this_level if not build_hash(x, level) in prev_hashes]
def insert_dataset(session, data, tbl):
'''Batch insert data into the database using PostGIS specific functions.
:param session: SQLAlchemy Session
:type session: sqlalchemy.orm.session.Session
:param data: DataFrame containing value, timestamp, longitude and latitude
:type data: pandas.core.frame.DataFrame
:param tbl: Base class representing the database table for the data
:type tbl: sqlalchemy.ext.declarative.api.DeclarativeMeta
'''
values = sqlalchemy.select([sqlalchemy.func.unnest(data.value),
sqlalchemy.func.unnest(data.timestamp),
sqlalchemy.func.ST_MakePoint(
sqlalchemy.func.unnest(data.longitude),
sqlalchemy.func.unnest(data.latitude))])
query = sqlalchemy.insert(tbl).from_select(tbl.columns, values)
session.execute(query)
def _answer_stat(survey_node: AnswerableSurveyNode,
allowable_types: set,
func: Function) -> object:
type_constraint = survey_node.the_type_constraint
if type_constraint not in allowable_types:
raise InvalidTypeForOperation(
(type_constraint, func._FunctionGenerator__names[0])
)
answer_cls = ANSWER_TYPES[survey_node.the_type_constraint]
return (
object_session(survey_node)
.scalar(
sa.select([func(answer_cls.main_answer)])
.select_from(Answer.__table__.join(
answer_cls.__table__, Answer.id == answer_cls.id
))
.where(Answer.survey_node_id == survey_node.id)
)
def downgrade(migrate_engine):
meta = MetaData()
meta.bind = migrate_engine
instances = Table('instances', meta, autoload=True)
volumes = Table('volumes', meta, autoload=True)
instance_id_column = Column('instance_id', Integer)
instance_id_column.create(volumes)
try:
volumes.update().values(
instance_id=select(
[instances.c.id],
instances.c.uuid == volumes.c.instance_uuid)
).execute()
except Exception:
instance_id_column.drop()
fkeys = list(volumes.c.instance_id.foreign_keys)
if fkeys:
try:
fk_name = fkeys[0].constraint.name
ForeignKeyConstraint(
columns=[volumes.c.instance_id],
refcolumns=[instances.c.id],
name=fk_name).create()
except Exception: