How to use the sqlalchemy.text function in SQLAlchemy

To help you get started, we’ve selected a few SQLAlchemy examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github sqlalchemy / sqlalchemy / test / dialect / oracle / test_types.py View on Github external
AS nidata,
            (SELECT CAST((SELECT fdata FROM foo) AS FLOAT) FROM DUAL) AS fdata
        FROM dual
        """
        row = testing.db.execute(stmt).fetchall()[0]
        eq_(
            [type(x) for x in row],
            [int, decimal.Decimal, int, int, decimal.Decimal],
        )
        eq_(
            row,
            (5, decimal.Decimal("45.6"), 45, 53, decimal.Decimal("45.68392")),
        )

        row = testing.db.execute(
            text(stmt).columns(
                idata=Integer(),
                ndata=Numeric(20, 2),
                ndata2=Numeric(20, 2),
                nidata=Numeric(5, 0),
                fdata=Float(),
            )
        ).fetchall()[0]
        eq_(
            [type(x) for x in row],
            [int, decimal.Decimal, decimal.Decimal, decimal.Decimal, float],
        )
        eq_(
            row,
            (
                5,
                decimal.Decimal("45.6"),
github HaliteChallenge / Halite-III / apiserver / alembic / versions / 201807052103_5aaeafd07224_.py View on Github external
sa.Column('player_level', sa.Enum('High School', 'University', 'Professional', name='user_player_level'), server_default=sa.text("'Professional'"), nullable=False),
                    sa.Column('organization_id', sa.Integer(), autoincrement=False, nullable=True),
                    sa.Column('country_code', sa.String(length=3), nullable=True),
                    sa.Column('country_subdivision_code', sa.String(length=10), nullable=True),
                    sa.Column('creation_time', sa.TIMESTAMP(timezone=True), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=True),
                    sa.Column('update_time', sa.TIMESTAMP(timezone=True), nullable=True),
                    sa.Column('api_key_hash', sa.String(length=255), nullable=True),
                    sa.Column('is_admin', sa.Boolean(), server_default=sa.text("'0'"), autoincrement=False, nullable=True),
                    sa.Column('is_gpu_enabled', sa.Boolean(), server_default=sa.text("'0'"), autoincrement=False, nullable=False),
                    sa.ForeignKeyConstraint(['organization_id'], ['organization.id'], name='user_ibfk_1'),
                    sa.PrimaryKeyConstraint('id'),
    )

    op.create_table('challenge',
                    sa.Column('id', sa.Integer(), nullable=False),
                    sa.Column('created', sa.TIMESTAMP(timezone=True), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False),
                    sa.Column('finished', sa.TIMESTAMP(timezone=True), nullable=True),
                    sa.Column('num_games', sa.Integer(), autoincrement=False, nullable=False),
                    sa.Column('status', sa.Enum('created', 'playing_game', 'finished', name='challenge_status'), server_default=sa.text("'created'"), nullable=False),
                    sa.Column('most_recent_game_task', sa.TIMESTAMP(timezone=True), nullable=True),
                    sa.Column('issuer', sa.Integer(), autoincrement=False, nullable=False),
                    sa.Column('winner', sa.Integer(), autoincrement=False, nullable=True),
                    sa.ForeignKeyConstraint(['issuer'], ['user.id'], name='challenge_issuer_fk', ondelete='CASCADE'),
                    sa.ForeignKeyConstraint(['winner'], ['user.id'], name='challenge_winner_fk', ondelete='CASCADE'),
                    sa.PrimaryKeyConstraint('id'),
    )
    op.create_table('challenge_participant',
                    sa.Column('challenge_id', sa.Integer(), autoincrement=False, nullable=False),
                    sa.Column('user_id', sa.Integer(), autoincrement=False, nullable=False),
                    sa.Column('points', sa.Integer(), autoincrement=False, nullable=False),
                    sa.ForeignKeyConstraint(['challenge_id'], ['challenge.id'], name='challenge_participant_fk', ondelete='CASCADE'),
                    sa.ForeignKeyConstraint(['user_id'], ['user.id'], name='challenge_participant_ibfk_2', ondelete='CASCADE'),
github hotosm / tasking-manager / migrations / versions / a7c617755721_.py View on Github external
op.create_table(
        "users_licenses",
        sa.Column("user", sa.BigInteger(), nullable=True),
        sa.Column("license", sa.Integer(), nullable=True),
        sa.ForeignKeyConstraint(["license"], ["licenses.id"]),
        sa.ForeignKeyConstraint(["user"], ["users.id"]),
    )
    op.drop_index("idx_areas_of_interest_centroid", table_name="areas_of_interest")
    op.drop_index("idx_areas_of_interest_geometry", table_name="areas_of_interest")
    op.add_column("projects", sa.Column("license_id", sa.Integer(), nullable=True))
    op.create_foreign_key("fk_licenses", "projects", "licenses", ["license_id"], ["id"])
    op.drop_index("idx_tasks_geometry", table_name="tasks")
    # ### end Alembic commands ###

    # Custom index, not created with reflection
    op.create_index("idx_username_lower", "users", [text("lower(username)")])
github metabrainz / acousticbrainz-server / db / import_mb_data.py View on Github external
filters.append("release_group.artist_credit in :credit")
        filter_data["credit"] = tuple(artist_credit_from_recording)

    if MB_release_group_gid_redirect_data:
        filters.append("release_group.id in :redirect_data")
        filter_data["redirect_data"] = tuple(MB_release_group_gid_redirect_fk_release_group)

    if MB_release_data:
        filters.append("release_group.id in :release_data")
        filter_data["release_data"] = tuple(MB_release_fk_release_group)

    filterstr = " OR ".join(filters)
    if filterstr:
        filterstr = " WHERE " + filterstr

    release_group_query = text("""
        SELECT DISTINCT release_group.id,
               release_group.gid,
               release_group.name,
               release_group.artist_credit,
               release_group.type,
               release_group.comment,
               release_group.edits_pending,
               release_group.last_updated
          FROM release_group
            {filterstr}
    """.format(filterstr=filterstr)
    )

    result = connection.execute(release_group_query, filter_data)
    MB_release_group_data = result.fetchall()
github nesfit / fitcrack / webadmin / fitcrackAPI / src / src / database / models.py View on Github external
xml_doc_in = Column(LargeBinary)
    xml_doc_out = Column(LargeBinary)
    stderr_out = Column(LargeBinary)
    batch = Column(Integer, nullable=False)
    file_delete_state = Column(Integer, nullable=False, index=True)
    validate_state = Column(Integer, nullable=False)
    claimed_credit = Column(Float(asdecimal=True), nullable=False)
    granted_credit = Column(Float(asdecimal=True), nullable=False)
    opaque = Column(Float(asdecimal=True), nullable=False)
    random = Column(Integer, nullable=False)
    app_version_num = Column(Integer, nullable=False)
    appid = Column(Integer, nullable=False)
    exit_status = Column(Integer, nullable=False)
    teamid = Column(Integer, nullable=False)
    priority = Column(Integer, nullable=False)
    mod_time = Column(DateTime, nullable=False, server_default=text("CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"))
    elapsed_time = Column(Float(asdecimal=True), nullable=False)
    flops_estimate = Column(Float(asdecimal=True), nullable=False)
    app_version_id = Column(Integer, nullable=False)
    runtime_outlier = Column(Integer, nullable=False)
    size_class = Column(SmallInteger, nullable=False, server_default=text("'-1'"))
    peak_working_set_size = Column(Float(asdecimal=True), nullable=False)
    peak_swap_size = Column(Float(asdecimal=True), nullable=False)
    peak_disk_usage = Column(Float(asdecimal=True), nullable=False)

    @hybrid_property
    def stderr_out_text(self):
        try:
            return getStringBetween(self.stderr_out.decode("utf-8"), '', '' )
        except:
            return self.stderr_out.decode("utf-8")
github clld / glottolog3 / migrations / versions / 7a787c138e_sync_source_pk_with_id.py View on Github external
def drop_add_fk(names):
        yield 'ALTER TABLE %(stab)s DROP CONSTRAINT %(const)s' % names
        yield ('ALTER TABLE %(stab)s ADD CONSTRAINT %(const)s '
            'FOREIGN KEY (%(scol)s) REFERENCES %(ttab)s (%(tcol)s)' % names)
    
    update_source = sa.text('UPDATE source SET pk = :after WHERE pk = :before')
    update_other = 'UPDATE %(stab)s SET %(scol)s = :after WHERE %(scol)s = :before'

    fks = conn.execute(select_fks).fetchall()
    drop, add = zip(*map(drop_add_fk, fks))
    
    conn.execute(';\n'.join(drop))
    conn.execute(update_source, before_after)
    for names in fks:
        conn.execute(sa.text(update_other % names), before_after)
    conn.execute(';\n'.join(add))
github agdsn / pycroft / legacy / userman_model.py View on Github external
"""Bankkontobewegungen"""
    __tablename__ = u'bank_konto'

    bkid = Column(Integer, primary_key=True, server_default=text("nextval(('\"bank_konto_bkid_seq\"'::text)::regclass)"))
    valid_on = Column(u'datum', Date, nullable=False)
    wert = Column(Money, nullable=False, server_default=text("0"))
    bes = Column(Text)


class BkBuchung(BankKonto):
    """Verbuchte Bankkontobewegungen"""
    __tablename__ = u'bk_buchung'

    bkid = Column(ForeignKey(BankKonto.bkid), primary_key=True)
    posted_at = Column(u'datum', DateTime(True), nullable=False, server_default=text("now()"))
    bearbeiter = Column(String, nullable=False, server_default=text("\"current_user\"()"))
    rechnungs_nr = Column(Integer)
    konto_id = Column(ForeignKey(FinanzKonten.id), nullable=False)
    konto = relationship(FinanzKonten, backref="bankbuchungen")
    uid = Column(Integer)


class Buchungen(Base):
    __tablename__ = u'buchungen'

    oid = Column(OID, primary_key=True)
    bkid = Column(Integer)
    fbid = Column(Integer)
    datum = Column(Date)
    bearbeiter = Column(String)
    rechnungs_nr = Column(Integer)
    soll = Column(ForeignKey(FinanzKonten.id))
github feisuzhu / thbattle / src / services / member.py View on Github external
def get_user_info(self, uid):
        uid = int(uid)

        member = session.execute(text('''
            SELECT * FROM pre_common_member
            WHERE uid=:uid
        '''), {'uid': int(uid)}).fetchone()

        if not member:
            return {}

        return self._get_user_info(member)
github metabrainz / acousticbrainz-server / db / import_mb_data.py View on Github external
def load_artist_type(connection):
    """Fetch artist_type table data from MusicBrainz database for the recording MBIDs in
    AcousticBrainz database. Retrieving complete data because the rows in MusicBrainz database
    for this table are much less in number.

    Args:
        connection: database connection to execute the query.
    Returns:
        artist_type data fetched from MusicBrainz database.
    """
    artist_type_query = text("""
        SELECT *
          FROM artist_type
      ORDER BY id
    """)
    result = connection.execute(artist_type_query)
    MB_artist_type_data = result.fetchall()

    return MB_artist_type_data
github qlands / FormShare / alembic / versions / fc1b66b49bd4_add_form_type.py View on Github external
def upgrade():
    # ### commands auto generated by Alembic - please adjust! ###
    op.add_column(
        "odkform",
        sa.Column(
            "form_type", sa.INTEGER(), server_default=sa.text("'1'"), nullable=True
        ),