Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def alter_table_cmd(node, output):
cmdtype = node.subtype
if cmdtype == enums.AlterTableType.AT_ChangeOwner:
output.write("OWNER TO ")
output.print_name(node.newowner)
return
if cmdtype == enums.AlterTableType.AT_SetStatistics:
output.write("ALTER COLUMN ")
output.print_name(node.name)
output.write(" SET STATISTICS ")
output.print_node(node['def'])
return
if cmdtype == enums.AlterTableType.AT_ColumnDefault:
output.write("ALTER COLUMN ")
output.print_name(node.name)
if node['def']:
output.write(" SET DEFAULT ")
if cmdtype == enums.AlterTableType.AT_DropColumn:
output.write("DROP COLUMN ")
if node.missing_ok:
output.write("IF EXISTS ")
output.print_name(node.name)
if node.behavior == enums.DropBehavior.DROP_CASCADE:
output.write("CASCADE ")
return
if cmdtype == enums.AlterTableType.AT_SetNotNull:
output.write("ALTER COLUMN ")
output.print_name(node.name)
output.write(" SET NOT NULL")
return
if cmdtype == enums.AlterTableType.AT_EnableTrig:
output.write("ENABLE TRIGGER ")
output.print_name(node.name)
return
if cmdtype == enums.AlterTableType.AT_DisableTrig:
output.write("DISABLE TRIGGER ")
output.print_name(node.name)
return
raise NotImplementedError("Unsupported alter table cmd: %s" % cmdtype) # pragma: nocover
def _alter_table_stmt(node, fk_regex, missing_fk):
table_name = node.relation.relname.value
for cmd in node.cmds:
if cmd.subtype == AlterTableType.AT_AddColumn:
if _column_needs_foreign_key(fk_regex, cmd['def']):
key = '{}.{}'.format(table_name, cmd['def'].colname.value)
missing_fk[key] = cmd['def']
elif cmd.subtype in (AlterTableType.AT_AddConstraint,
AlterTableType.AT_AddConstraintRecurse):
constraint = cmd['def']
_remove_satisfied_foreign_keys(constraint, table_name, missing_fk)
::
{
'AlterTableCmd': {
'def': {
'ColumnDef': {
'colname': 'bar',
'constraints': [{'Constraint': {'contype': 2}}]
}
}
}
}
"""
# We only care about adding a column
if node.subtype != AlterTableType.AT_AddColumn:
return
constraints = node['def'].constraints
# No constraints imposed, nothing to do.
if constraints == pglast.Missing:
return
for constraint in constraints:
if constraint.contype.value in disallowed_constraints:
col = node['def'].colname.value
ctx.report(
self.ConstraintNotAllowed(col=col),
node=constraint)
def _alter_table_stmt(node, fk_regex, missing_fk):
table_name = node.relation.relname.value
for cmd in node.cmds:
if cmd.subtype == AlterTableType.AT_AddColumn:
if _column_needs_foreign_key(fk_regex, cmd['def']):
key = '{}.{}'.format(table_name, cmd['def'].colname.value)
missing_fk[key] = cmd['def']
elif cmd.subtype in (AlterTableType.AT_AddConstraint,
AlterTableType.AT_AddConstraintRecurse):
constraint = cmd['def']
_remove_satisfied_foreign_keys(constraint, table_name, missing_fk)
Node is an `AlterTableCmd`:
{
'AlterTableCmd': {
'def': {
'ColumnDef': {
'colname': 'bar',
'constraints': [{'Constraint': {'contype': 2, 'location': 35}}]
}
}
}
}
"""
# We only care about changing the type of a column
if node.subtype != AlterTableType.AT_AlterColumnType:
return
ty = node['def'].typeName
issue = self.ChangeTypeNotAllowed(col=node.name.value)
ctx.report(issue, node=ty)