Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
AS nidata,
(SELECT CAST((SELECT fdata FROM foo) AS FLOAT) FROM DUAL) AS fdata
FROM dual
"""
row = testing.db.execute(stmt).fetchall()[0]
eq_(
[type(x) for x in row],
[int, decimal.Decimal, int, int, decimal.Decimal],
)
eq_(
row,
(5, decimal.Decimal("45.6"), 45, 53, decimal.Decimal("45.68392")),
)
row = testing.db.execute(
text(stmt).columns(
idata=Integer(),
ndata=Numeric(20, 2),
ndata2=Numeric(20, 2),
nidata=Numeric(5, 0),
fdata=Float(),
)
).fetchall()[0]
eq_(
[type(x) for x in row],
[int, decimal.Decimal, decimal.Decimal, decimal.Decimal, float],
)
eq_(
row,
(
5,
decimal.Decimal("45.6"),
sa.Column('player_level', sa.Enum('High School', 'University', 'Professional', name='user_player_level'), server_default=sa.text("'Professional'"), nullable=False),
sa.Column('organization_id', sa.Integer(), autoincrement=False, nullable=True),
sa.Column('country_code', sa.String(length=3), nullable=True),
sa.Column('country_subdivision_code', sa.String(length=10), nullable=True),
sa.Column('creation_time', sa.TIMESTAMP(timezone=True), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=True),
sa.Column('update_time', sa.TIMESTAMP(timezone=True), nullable=True),
sa.Column('api_key_hash', sa.String(length=255), nullable=True),
sa.Column('is_admin', sa.Boolean(), server_default=sa.text("'0'"), autoincrement=False, nullable=True),
sa.Column('is_gpu_enabled', sa.Boolean(), server_default=sa.text("'0'"), autoincrement=False, nullable=False),
sa.ForeignKeyConstraint(['organization_id'], ['organization.id'], name='user_ibfk_1'),
sa.PrimaryKeyConstraint('id'),
)
op.create_table('challenge',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('created', sa.TIMESTAMP(timezone=True), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False),
sa.Column('finished', sa.TIMESTAMP(timezone=True), nullable=True),
sa.Column('num_games', sa.Integer(), autoincrement=False, nullable=False),
sa.Column('status', sa.Enum('created', 'playing_game', 'finished', name='challenge_status'), server_default=sa.text("'created'"), nullable=False),
sa.Column('most_recent_game_task', sa.TIMESTAMP(timezone=True), nullable=True),
sa.Column('issuer', sa.Integer(), autoincrement=False, nullable=False),
sa.Column('winner', sa.Integer(), autoincrement=False, nullable=True),
sa.ForeignKeyConstraint(['issuer'], ['user.id'], name='challenge_issuer_fk', ondelete='CASCADE'),
sa.ForeignKeyConstraint(['winner'], ['user.id'], name='challenge_winner_fk', ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id'),
)
op.create_table('challenge_participant',
sa.Column('challenge_id', sa.Integer(), autoincrement=False, nullable=False),
sa.Column('user_id', sa.Integer(), autoincrement=False, nullable=False),
sa.Column('points', sa.Integer(), autoincrement=False, nullable=False),
sa.ForeignKeyConstraint(['challenge_id'], ['challenge.id'], name='challenge_participant_fk', ondelete='CASCADE'),
sa.ForeignKeyConstraint(['user_id'], ['user.id'], name='challenge_participant_ibfk_2', ondelete='CASCADE'),
op.create_table(
"users_licenses",
sa.Column("user", sa.BigInteger(), nullable=True),
sa.Column("license", sa.Integer(), nullable=True),
sa.ForeignKeyConstraint(["license"], ["licenses.id"]),
sa.ForeignKeyConstraint(["user"], ["users.id"]),
)
op.drop_index("idx_areas_of_interest_centroid", table_name="areas_of_interest")
op.drop_index("idx_areas_of_interest_geometry", table_name="areas_of_interest")
op.add_column("projects", sa.Column("license_id", sa.Integer(), nullable=True))
op.create_foreign_key("fk_licenses", "projects", "licenses", ["license_id"], ["id"])
op.drop_index("idx_tasks_geometry", table_name="tasks")
# ### end Alembic commands ###
# Custom index, not created with reflection
op.create_index("idx_username_lower", "users", [text("lower(username)")])
filters.append("release_group.artist_credit in :credit")
filter_data["credit"] = tuple(artist_credit_from_recording)
if MB_release_group_gid_redirect_data:
filters.append("release_group.id in :redirect_data")
filter_data["redirect_data"] = tuple(MB_release_group_gid_redirect_fk_release_group)
if MB_release_data:
filters.append("release_group.id in :release_data")
filter_data["release_data"] = tuple(MB_release_fk_release_group)
filterstr = " OR ".join(filters)
if filterstr:
filterstr = " WHERE " + filterstr
release_group_query = text("""
SELECT DISTINCT release_group.id,
release_group.gid,
release_group.name,
release_group.artist_credit,
release_group.type,
release_group.comment,
release_group.edits_pending,
release_group.last_updated
FROM release_group
{filterstr}
""".format(filterstr=filterstr)
)
result = connection.execute(release_group_query, filter_data)
MB_release_group_data = result.fetchall()
xml_doc_in = Column(LargeBinary)
xml_doc_out = Column(LargeBinary)
stderr_out = Column(LargeBinary)
batch = Column(Integer, nullable=False)
file_delete_state = Column(Integer, nullable=False, index=True)
validate_state = Column(Integer, nullable=False)
claimed_credit = Column(Float(asdecimal=True), nullable=False)
granted_credit = Column(Float(asdecimal=True), nullable=False)
opaque = Column(Float(asdecimal=True), nullable=False)
random = Column(Integer, nullable=False)
app_version_num = Column(Integer, nullable=False)
appid = Column(Integer, nullable=False)
exit_status = Column(Integer, nullable=False)
teamid = Column(Integer, nullable=False)
priority = Column(Integer, nullable=False)
mod_time = Column(DateTime, nullable=False, server_default=text("CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"))
elapsed_time = Column(Float(asdecimal=True), nullable=False)
flops_estimate = Column(Float(asdecimal=True), nullable=False)
app_version_id = Column(Integer, nullable=False)
runtime_outlier = Column(Integer, nullable=False)
size_class = Column(SmallInteger, nullable=False, server_default=text("'-1'"))
peak_working_set_size = Column(Float(asdecimal=True), nullable=False)
peak_swap_size = Column(Float(asdecimal=True), nullable=False)
peak_disk_usage = Column(Float(asdecimal=True), nullable=False)
@hybrid_property
def stderr_out_text(self):
try:
return getStringBetween(self.stderr_out.decode("utf-8"), '', '' )
except:
return self.stderr_out.decode("utf-8")
def drop_add_fk(names):
yield 'ALTER TABLE %(stab)s DROP CONSTRAINT %(const)s' % names
yield ('ALTER TABLE %(stab)s ADD CONSTRAINT %(const)s '
'FOREIGN KEY (%(scol)s) REFERENCES %(ttab)s (%(tcol)s)' % names)
update_source = sa.text('UPDATE source SET pk = :after WHERE pk = :before')
update_other = 'UPDATE %(stab)s SET %(scol)s = :after WHERE %(scol)s = :before'
fks = conn.execute(select_fks).fetchall()
drop, add = zip(*map(drop_add_fk, fks))
conn.execute(';\n'.join(drop))
conn.execute(update_source, before_after)
for names in fks:
conn.execute(sa.text(update_other % names), before_after)
conn.execute(';\n'.join(add))
"""Bankkontobewegungen"""
__tablename__ = u'bank_konto'
bkid = Column(Integer, primary_key=True, server_default=text("nextval(('\"bank_konto_bkid_seq\"'::text)::regclass)"))
valid_on = Column(u'datum', Date, nullable=False)
wert = Column(Money, nullable=False, server_default=text("0"))
bes = Column(Text)
class BkBuchung(BankKonto):
"""Verbuchte Bankkontobewegungen"""
__tablename__ = u'bk_buchung'
bkid = Column(ForeignKey(BankKonto.bkid), primary_key=True)
posted_at = Column(u'datum', DateTime(True), nullable=False, server_default=text("now()"))
bearbeiter = Column(String, nullable=False, server_default=text("\"current_user\"()"))
rechnungs_nr = Column(Integer)
konto_id = Column(ForeignKey(FinanzKonten.id), nullable=False)
konto = relationship(FinanzKonten, backref="bankbuchungen")
uid = Column(Integer)
class Buchungen(Base):
__tablename__ = u'buchungen'
oid = Column(OID, primary_key=True)
bkid = Column(Integer)
fbid = Column(Integer)
datum = Column(Date)
bearbeiter = Column(String)
rechnungs_nr = Column(Integer)
soll = Column(ForeignKey(FinanzKonten.id))
def get_user_info(self, uid):
uid = int(uid)
member = session.execute(text('''
SELECT * FROM pre_common_member
WHERE uid=:uid
'''), {'uid': int(uid)}).fetchone()
if not member:
return {}
return self._get_user_info(member)
def load_artist_type(connection):
"""Fetch artist_type table data from MusicBrainz database for the recording MBIDs in
AcousticBrainz database. Retrieving complete data because the rows in MusicBrainz database
for this table are much less in number.
Args:
connection: database connection to execute the query.
Returns:
artist_type data fetched from MusicBrainz database.
"""
artist_type_query = text("""
SELECT *
FROM artist_type
ORDER BY id
""")
result = connection.execute(artist_type_query)
MB_artist_type_data = result.fetchall()
return MB_artist_type_data
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column(
"odkform",
sa.Column(
"form_type", sa.INTEGER(), server_default=sa.text("'1'"), nullable=True
),