Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_column_accessor_string_no_target_column(self):
fk = ForeignKey("sometable.somecol")
c1 = Column('x', fk)
m = MetaData()
Table('t', m, c1)
Table("sometable", m, Column('notsomecol', Integer))
assert_raises_message(
exc.NoReferencedColumnError,
"Could not initialize target column for ForeignKey "
"'sometable.somecol' on table 't': "
"table 'sometable' has no column named 'somecol'",
getattr, fk, "column"
)
def test_functional_ix_one(self):
m1 = MetaData()
m2 = MetaData()
t1 = Table(
"foo",
m1,
Column("id", Integer, primary_key=True),
Column("email", String(50)),
)
Index("email_idx", func.lower(t1.c.email), unique=True)
t2 = Table(
"foo",
m2,
Column("id", Integer, primary_key=True),
Column("email", String(50)),
)
Index("email_idx", func.lower(t2.c.email), unique=True)
with assertions.expect_warnings(
def test_non_column_clause(self):
meta = MetaData()
a = Table(
"a",
meta,
Column("id", Integer, primary_key=True),
Column("x", Integer),
)
b = Table(
"b",
meta,
Column("id", Integer, ForeignKey("a.id"), primary_key=True),
Column("x", Integer, primary_key=True),
)
j = a.join(b, and_(a.c.id == b.c.id, b.c.x == 5))
assert str(j) == "a JOIN b ON a.id = b.id AND b.x = :x_1", str(j)
assert list(j.primary_key) == [a.c.id, b.c.x]
def __init__(self, meta, routes, segments, hierarchy, style_config, uptable):
self.config = style_config
srid = segments.srid
table = sa.Table(self.config.table_name, meta,
sa.Column('id', sa.BigInteger
,primary_key=True, autoincrement=False),
sa.Column('geom', Geometry('LINESTRING', srid=srid)),
sa.Column('geom100', Geometry('LINESTRING', srid=srid))
)
self.config.add_columns(table)
super().__init__(table, segments.change)
self.rels = routes
self.ways = segments
self.rtree = hierarchy
# table that holds geometry updates
self.uptable = uptable
def upgrade(migrate_engine):
metadata.bind = migrate_engine
print(__doc__)
metadata.reflect()
try:
SampleDataset_table = Table( "sample_dataset", metadata, autoload=True )
except NoSuchTableError:
SampleDataset_table = None
log.debug( "Failed loading table 'sample_dataset'" )
if SampleDataset_table is not None:
cmd = "SELECT id, file_path FROM sample_dataset"
result = migrate_engine.execute( cmd )
filepath_dict = {}
for r in result:
id = int(r[0])
filepath_dict[id] = r[1]
# remove the 'file_path' column
try:
SampleDataset_table.c.file_path.drop()
except Exception:
log.exception("Deleting column 'file_path' from the 'sample_dataset' table failed.")
Column(self.FN_DAG_ID, String, primary_key=True),
Column(self.FN_EXEC_DATE, DateTime, primary_key=True),
Column(self.FN_TASK_ID, String, nullable=False),
)
self.t_load_status = Table(
self.TN_LOAD_STATUS,
metadata,
Column(self.FN_DATA_ASSET, String, primary_key=True),
Column(self.FN_LOAD_TIME, DateTime, nullable=False),
Column(self.FN_DAG_ID, String, primary_key=True),
Column(self.FN_EXEC_DATE, DateTime, primary_key=True),
Column(self.FN_TASK_ID, String, nullable=False),
)
self.t_load_status_hist = Table(
self.TN_LOAD_STATUS_HIST,
metadata,
Column(self.FN_DATA_ASSET, String, primary_key=True),
Column(self.FN_LOAD_TIME, DateTime, primary_key=True),
Column(self.FN_DAG_ID, String, primary_key=True),
Column(self.FN_EXEC_DATE, DateTime, primary_key=True),
Column(self.FN_TASK_ID, String, nullable=False),
)
self.t_infile_metadata = Table(
self.TN_INFILE_METADATA,
metadata,
Column(self.FN_DATA_ASSET, String, primary_key=True),
Column(self.FN_FILE_PATH, String, primary_key=True),
Column(self.FN_DAG_ID, String, primary_key=True),
Column(self.FN_EXEC_DATE, DateTime, primary_key=True),
if isinstance(column.type, NullType):
column_copy = _get_not_supported_column(col_name_col_instance,
column.name)
else:
column_copy = column.copy()
else:
column_copy = Column('deleted', table.c.id.type,
default=default_deleted_value)
columns.append(column_copy)
constraints = []
for constraint in table.constraints:
if not _is_deleted_column_constraint(constraint):
constraints.append(constraint.copy())
new_table = Table(table_name + "__tmp__", meta,
*(columns + constraints))
new_table.create()
indexes = []
for index in get_indexes(engine, table_name):
column_names = [new_table.c[c] for c in index['column_names']]
indexes.append(Index(index["name"], *column_names,
unique=index["unique"]))
table.drop()
for index in indexes:
index.create(engine)
new_table.rename(table_name)
deleted = True # workaround for pyflakes
new_table.update().\
def update_alembic_version(old, new):
"""Correct alembic head in order to upgrade DB using EMC method.
:param:old: Actual alembic head
:param:new: Expected alembic head to be updated
"""
meta = MetaData(engine)
alembic_version = Table('alembic_version', meta, autoload=True)
alembic_version.update().values(
version_num=new).where(
alembic_version.c.version_num == old).execute()
def profile_association(table_name):
column = '{}_id'.format(table_name)
assoc = Table('{}_profiles'.format(table_name), Base.metadata,
Column('profile_id', Integer, ForeignKey('profile.id')),
Column(column, Integer, ForeignKey('{}.id'.format(table_name))),
UniqueConstraint('profile_id', column))
return assoc
consumer_table = Table('cp_consumer', metadata,
Column('uuid', String, primary_key=True),
Column('name', String),
Column('owner', String, ForeignKey('cp_owner.uuid')),
Column('parent', String, ForeignKey('cp_consumer.uuid')))
consumer_info_to_consumer_table = Table('cp_consumer_info_to_consumer', metadata,
Column('cid', String, ForeignKey('cp_consumer.uuid')),
Column('info_id', Integer, ForeignKey('cp_consumer_info.id')))
consumer_type_table = Table('cp_consumer_type', metadata,
Column('id', Integer, Sequence('consumer_type_id_seq'), primary_key=True),
Column('label', String))
consumer_info_table = Table('cp_consumer_info', metadata,
Column('id', Integer, Sequence('consumer_info_id_seq'), primary_key=True),
Column('name', String),
Column('value', String),
Column('type', Integer, ForeignKey('cp_consumer_type.id')))
product_definition_table = Table('cp_product', metadata,
Column('uuid', String, primary_key=True),
Column('name', String))
entitlement_pool_table = Table('cp_entitlement_pool', metadata,
Column('uuid', String, primary_key=True),
Column('name', String),
Column('owner', String, ForeignKey('cp_owner.uuid')),
Column('product', String, ForeignKey('cp_product.uuid')))
def get_engine(db):