Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if match is not None and match.groups()[0] == column_to_update:
column_def = fk_filter_fn(column_def)
if column_def:
cleaned_columns.append(column_def)
# Update the name of the new CREATE TABLE query.
temp_table = table + '__tmp__'
rgx = re.compile('("?)%s("?)' % table, re.I)
create = rgx.sub(
'\\1%s\\2' % temp_table,
raw_create)
# Create the new table.
columns = ', '.join(cleaned_columns)
queries = [
NodeList([SQL('DROP TABLE IF EXISTS'), Entity(temp_table)]),
SQL('%s (%s)' % (create.strip(), columns))]
# Populate new table.
populate_table = NodeList((
SQL('INSERT INTO'),
Entity(temp_table),
EnclosedNodeList([Entity(col) for col in new_column_names]),
SQL('SELECT'),
CommaNodeList([Entity(col) for col in original_column_names]),
SQL('FROM'),
Entity(table)))
drop_original = NodeList([SQL('DROP TABLE'), Entity(table)])
# Drop existing table and rename temp table.
queries += [
populate_table,
def drop_index(migrator, model, index):
migration = migrator.make_context().literal('DROP INDEX ').sql(pw.Entity(index.name))
if is_mysql(model._meta.database):
migration = migration.literal(' ON ').sql(pw.Entity(_table_name(model)))
return extract_query_from_migration(migration)
def drop_foreign_key_constraint(self, table, column_name):
fk_constraint = self.get_foreign_key_constraint(table, column_name)
return (self
._alter_table(self.make_context(), table)
.literal(' DROP FOREIGN KEY ')
.sql(Entity(fk_constraint)))
def _drop_sequence(self, sequence_name):
return Clause(SQL('DROP SEQUENCE'), Entity(sequence_name))
drop_sequence = return_parsed_node('_drop_sequence')
def clean_options(cls, options):
content = options.get('content')
prefix = options.get('prefix')
tokenize = options.get('tokenize')
if isinstance(content, basestring) and content == '':
# Special-case content-less full-text search tables.
options['content'] = "''"
elif isinstance(content, Field):
# Special-case to ensure fields are fully-qualified.
options['content'] = Entity(content.model._meta.table_name,
content.column_name)
if prefix:
if isinstance(prefix, (list, tuple)):
prefix = ','.join([str(i) for i in prefix])
options['prefix'] = "'%s'" % prefix.strip("' ")
if tokenize and cls._meta.extension_module.lower() == 'fts5':
# Tokenizers need to be in quoted string for FTS5, but not for FTS3
# or FTS4.
options['tokenize'] = '"%s"' % tokenize
return options
rgx = re.compile('("?)%s("?)' % table, re.I)
create = rgx.sub(
'\\1%s\\2' % temp_table,
raw_create)
# Create the new table.
columns = ', '.join(cleaned_columns)
queries = [
Clause(SQL('DROP TABLE IF EXISTS'), Entity(temp_table)),
SQL('%s (%s)' % (create.strip(), columns))]
# Populate new table.
populate_table = Clause(
SQL('INSERT INTO'),
Entity(temp_table),
EnclosedClause(*[Entity(col) for col in new_column_names]),
SQL('SELECT'),
CommaClause(*[Entity(col) for col in original_column_names]),
SQL('FROM'),
Entity(table))
queries.append(populate_table)
# Drop existing table and rename temp table.
queries.append(Clause(
SQL('DROP TABLE'),
Entity(table)))
queries.append(self.rename_table(temp_table, table))
# Re-create user-defined indexes. User-defined indexes will have a
# non-empty SQL attribute.
for index in filter(lambda idx: idx.sql, indexes):
if column_to_update not in index.columns:
# Create the new table.
columns = ', '.join(cleaned_columns)
queries = [
Clause(SQL('DROP TABLE IF EXISTS'), Entity(temp_table)),
SQL('%s (%s)' % (create.strip(), columns))]
# Populate new table.
populate_table = Clause(
SQL('INSERT INTO'),
Entity(temp_table),
EnclosedClause(*[Entity(col) for col in new_column_names]),
SQL('SELECT'),
CommaClause(*[Entity(col) for col in original_column_names]),
SQL('FROM'),
Entity(table))
queries.append(populate_table)
# Drop existing table and rename temp table.
queries.append(Clause(
SQL('DROP TABLE'),
Entity(table)))
queries.append(self.rename_table(temp_table, table))
# Re-create user-defined indexes. User-defined indexes will have a
# non-empty SQL attribute.
for index in filter(lambda idx: idx.sql, indexes):
if column_to_update not in index.columns:
queries.append(SQL(index.sql))
elif new_column:
sql = self._fix_index(index.sql, column_to_update, new_column)
if sql is not None:
def drop_not_null(self, table, column):
column = self._get_column_definition(table, column)
if column.is_pk:
raise ValueError('Primary keys can not be null')
return Clause(
SQL('ALTER TABLE'),
Entity(table),
SQL('MODIFY'),
column.sql(is_null=True))
def _as_entity(self, with_table=False):
if with_table:
return Entity(self.model_class._meta.db_table, self.db_column)
return Entity(self.db_column)
def as_entity(cls):
if cls._meta.schema:
return Entity(cls._meta.schema, cls._meta.db_table)
return Entity(cls._meta.db_table)