使用 Alembic 修改 Enum 字段

如果使用的 PostgreSQL 版本大于9.1(它为枚举添加了 ALTER TYPE) ,那么如何在校验迁移中向 Enum 字段添加元素?问题解释了直接的过程,但我不太确定如何最好地翻译使用蒸馏器。

这就是我所拥有的:

new_type = sa.Enum('nonexistent_executable', 'output_limit_exceeded',
'signal', 'success', 'timed_out', name='status')
old_type = sa.Enum('nonexistent_executable', 'signal', 'success', 'timed_out',
name='status')
tcr = sa.sql.table('testcaseresult',
sa.Column('status', new_type, nullable=False))




def upgrade():
op.alter_column('testcaseresult', u'status', type_=new_type,
existing_type=old_type)




def downgrade():
op.execute(tcr.update().where(tcr.c.status==u'output_limit_exceeded')
.values(status='timed_out'))
op.alter_column('testcaseresult', u'status', type_=old_type,
existing_type=new_type)

不幸的是,上面的代码只在升级时生成 ALTER TABLE testcaseresult ALTER COLUMN status TYPE status,它基本上什么也不做。

51721 次浏览

我决定尽可能直接跟随 Postgres 进场,并提出了以下迁移。

from alembic import op
import sqlalchemy as sa


old_options = ('nonexistent_executable', 'signal', 'success', 'timed_out')
new_options = sorted(old_options + ('output_limit_exceeded',))


old_type = sa.Enum(*old_options, name='status')
new_type = sa.Enum(*new_options, name='status')
tmp_type = sa.Enum(*new_options, name='_status')


tcr = sa.sql.table('testcaseresult',
sa.Column('status', new_type, nullable=False))




def upgrade():
# Create a tempoary "_status" type, convert and drop the "old" type
tmp_type.create(op.get_bind(), checkfirst=False)
op.execute('ALTER TABLE testcaseresult ALTER COLUMN status TYPE _status'
' USING status::text::_status')
old_type.drop(op.get_bind(), checkfirst=False)
# Create and convert to the "new" status type
new_type.create(op.get_bind(), checkfirst=False)
op.execute('ALTER TABLE testcaseresult ALTER COLUMN status TYPE status'
' USING status::text::status')
tmp_type.drop(op.get_bind(), checkfirst=False)




def downgrade():
# Convert 'output_limit_exceeded' status into 'timed_out'
op.execute(tcr.update().where(tcr.c.status==u'output_limit_exceeded')
.values(status='timed_out'))
# Create a tempoary "_status" type, convert and drop the "new" type
tmp_type.create(op.get_bind(), checkfirst=False)
op.execute('ALTER TABLE testcaseresult ALTER COLUMN status TYPE _status'
' USING status::text::_status')
new_type.drop(op.get_bind(), checkfirst=False)
# Create and convert to the "old" status type
old_type.create(op.get_bind(), checkfirst=False)
op.execute('ALTER TABLE testcaseresult ALTER COLUMN status TYPE status'
' USING status::text::status')
tmp_type.drop(op.get_bind(), checkfirst=False)

看来,alembic 在其 alter_table方法中没有对 USING语句的直接支持。

在直接的 SQL 中,如果枚举中的事物的顺序不需要完全如上所述,那么这对 Postgres 是有效的:

ALTER TYPE status ADD value 'output_limit_exceeded' after 'timed_out';

在 Postgres 9.1中,可以使用 ALTER TYPE语句为枚举添加新值。这是复杂的事实,它不能在交易中完成。然而,这可以通过提交 alembic 的事务 看这里来解决。

实际上,我在使用旧的、更详细的解决方案时遇到了问题,因为 Postgres 不能自动转换列的默认值。

我使用了一个比接受的答案更简单的方法,步骤更少,这是我的基础。在这个示例中,我将假设所讨论的枚举名为“ status _ enum”,因为在可接受的答案中,对列和枚举使用“ status”使我感到困惑。

from alembic import op
import sqlalchemy as sa


name = 'status_enum'
tmp_name = 'tmp_' + name


old_options = ('nonexistent_executable', 'signal', 'success', 'timed_out')
new_options = sorted(old_options + ('output_limit_exceeded',))


new_type = sa.Enum(*new_options, name=name)
old_type = sa.Enum(*old_options, name=name)


tcr = sa.sql.table('testcaseresult',
sa.Column('status', new_type, nullable=False))


def upgrade():
op.execute('ALTER TYPE ' + name + ' RENAME TO ' + tmp_name)


new_type.create(op.get_bind())
op.execute('ALTER TABLE testcaseresult ALTER COLUMN status ' +
'TYPE ' + name + ' USING status::text::' + name)
op.execute('DROP TYPE ' + tmp_name)




def downgrade():
# Convert 'output_limit_exceeded' status into 'timed_out'
op.execute(tcr.update().where(tcr.c.status=='output_limit_exceeded')
.values(status='timed_out'))


op.execute('ALTER TYPE ' + name + ' RENAME TO ' + tmp_name)


old_type.create(op.get_bind())
op.execute('ALTER TABLE testcaseresult ALTER COLUMN status ' +
'TYPE ' + name + ' USING status::text::' + name)
op.execute('DROP TYPE ' + tmp_name)

我在尝试将一个列类型迁移到另一个列类型时遇到了同样的问题:

Alembic==0.9.4
SQLAlchemy==1.1.12

可以将参数 postgresql_using作为 alembic.op.alter_column的 kwarg 提供。

from alembic import op
import sqlalchemy as types


op.alter_column(
table_name='my_table',
column_name='my_column',
type_=types.NewType,
# allows to use postgresql USING
postgresql_using="my_column::PostgesEquivalentOfNewType",
)

希望能帮上忙。

这种做法没有任何问题:

from alembic import op


def upgrade():
op.execute("COMMIT")
op.execute("ALTER TYPE enum_type ADD VALUE 'new_value'")


def downgrade():
...

参考文献

我需要在迁移类型时移动数据,包括删除一些旧的类型,所以我想我应该根据(可怕的)公认的答案(https://stackoverflow.com/a/14845740/629272)写出一个更通用的方法来做到这一点。希望这能帮到同病相怜的其他人!

# This migration will move data from one column to two others based on the type
# for a given row, and modify the type of each row.
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql


revision = '000000000001'
down_revision = '000000000000'
branch_labels = None
depends_on = None


# This set of options makes up the old type.
example_types_old = (
'EXAMPLE_A',
'EXAMPLE_B',
'EXAMPLE_C',
)
example_type_enum_old = postgresql.ENUM(*example_types_old, name='exampletype')


# This set of options makes up the new type.
example_types_new = (
'EXAMPLE_C',
'EXAMPLE_D',
'EXAMPLE_E',
)
example_type_enum_new = postgresql.ENUM(*example_types_new, name='exampletype')


# This set of options includes everything from the old and new types.
example_types_tmp = set(example_types_old + example_types_new)
example_type_enum_tmp = postgresql.ENUM(*example_types_tmp, name='_exampletype')


# This is a table view from which we can select and update as necessary. This
# only needs to include the relevant columns which are in either the old or new
# version of the table.
examples_view = sa.Table(
# Use the name of the actual table so it is modified in the upgrade and
# downgrade.
'examples',
sa.MetaData(),
sa.Column('id', sa.Integer, primary_key=True),
# Use the _tmp type so all types are usable.
sa.Column('example_type', example_type_enum_tmp),
# This is a column from which the data will be migrated, after which the
# column will be removed.
sa.Column('example_old_column', sa.Integer),
# This is a column to which data from the old column will be added if the
# type is EXAMPLE_A.
sa.Column('example_new_column_a', sa.Integer),
# This is a column to which data from the old column will be added if the
# type is EXAMPLE_B.
sa.Column('example_new_column_b', sa.Integer),
)




def upgrade():
connection = op.get_bind()


# Add the new column to which data will be migrated.
example_new_column_a = sa.Column(
'example_new_column_a',
sa.Integer,
nullable=True
)
op.add_column('examples', example_new_column_a)


# Add the new column to which data will be migrated.
example_new_column_b = sa.Column(
'example_new_column_b',
sa.Integer,
nullable=True
)
op.add_column('examples', example_new_column_b)


# Create the temporary enum and change the example_type column to use the
# temporary enum.
# The USING statement automatically maps the old enum to the temporary one.
example_type_enum_tmp.create(connection, checkfirst=False)
# Change to the temporary type and map from the old type to the temporary
# one.
op.execute('''
ALTER TABLE examples
ALTER COLUMN example_type
TYPE _exampletype
USING example_type::text::_exampletype
''')


# Move data from example_old_column to example_new_column_a and change its
# type to EXAMPLE_D if the type is EXAMPLE_A.
connection.execute(
examples_view.update().where(
examples_view.c.example_type == 'EXAMPLE_A'
).values(
example_type='EXAMPLE_D',
example_new_column_a=examples_view.c.example_old_column,
)
)


# Move data from example_old_column to example_new_column_b and change its
# type to EXAMPLE_E if the type is EXAMPLE_B.
connection.execute(
examples_view.update().where(
examples_view.c.example_type == 'EXAMPLE_B'
).values(
example_type='EXAMPLE_E',
example_new_column_b=examples_view.c.example_old_column,
)
)


# Move any remaining data from example_old_column to example_new_column_a
# and keep its type as EXAMPLE_C.
connection.execute(
examples_view.update().where(
examples_view.c.example_type == 'EXAMPLE_C'
).values(
example_type='EXAMPLE_C',
example_new_column_a=examples_view.c.example_old_column,
)
)


# Delete the old enum now that the data with the old types have been moved.
example_type_enum_old.drop(connection, checkfirst=False)


# Create the new enum and change the example_type column to use the new
# enum.
# The USING statement automatically maps the temporary enum to the new one.
example_type_enum_new.create(connection, checkfirst=False)
op.execute('''
ALTER TABLE examples
ALTER COLUMN example_type
TYPE exampletype
USING example_type::text::exampletype
''')


# Delete the temporary enum.
example_type_enum_tmp.drop(connection, checkfirst=False)


# Remove the old column.
op.drop_column('examples', 'example_old_column')




# The downgrade just performs the opposite of all the upgrade operations but in
# reverse.
def downgrade():
connection = op.get_bind()


example_old_column = sa.Column(
'example_old_column',
sa.Integer,
nullable=True
)
op.add_column('examples', example_old_column)


example_type_enum_tmp.create(connection, checkfirst=False)
op.execute('''
ALTER TABLE examples
ALTER COLUMN example_type
TYPE _exampletype
USING example_type::text::_exampletype
''')


connection.execute(
examples_view.update().where(
examples_view.c.example_type == 'EXAMPLE_C'
).values(
example_type='EXAMPLE_C',
example_old_column=examples_view.c.example_new_column_b,
)
)


connection.execute(
examples_view.update().where(
examples_view.c.example_type == 'EXAMPLE_E'
).values(
example_type='EXAMPLE_B',
example_old_column=examples_view.c.example_new_column_b,
)
)


connection.execute(
examples_view.update().where(
examples_view.c.example_type == 'EXAMPLE_D'
).values(
example_type='EXAMPLE_A',
example_old_column=examples_view.c.example_new_column_a,
)
)


example_type_enum_old.create(connection, checkfirst=False)
op.execute('''
ALTER TABLE examples
ALTER COLUMN example_type
TYPE exampletype
USING example_type::text::exampletype
''')


example_type_enum_tmp.drop(connection, checkfirst=False)


op.drop_column('examples', 'example_new_column_b')
op.drop_column('examples', 'example_new_column_a')

由于我得到了转换错误和缺省值的问题,我根据已被接受的答案写了一个更普遍的答案:

def replace_enum_values(
name: str,
old: [str],
new: [str],
modify: [(str, str, str)]
):
"""
Replaces an enum's list of values.


Args:
name: Name of the enum
new: New list of values
old: Old list of values
modify: List of tuples of table name
and column to modify (which actively use the enum).
Assumes each column has a default val.
"""
connection = op.get_bind()


tmp_name = "{}_tmp".format(name)


# Rename old type
op.execute(
"ALTER TYPE {} RENAME TO {};"
.format(name, tmp_name)
)


# Create new type
lsl = sa.Enum(*new, name=name)
lsl.create(connection)


# Replace all usages
for (table, column) in modify:
# Get default to re-set later
default_typed = connection.execute(
"SELECT column_default "
"FROM information_schema.columns "
"WHERE table_name='{table}' "
"AND column_name='{column}';"
.format(table=table, column=column)
).first()[0]  # type: str


# Is bracketed already
default = default_typed[:default_typed.index("::")]


# Set all now invalid values to default
connection.execute(
"UPDATE {table} "
"SET {column}={default} "
"WHERE {column} NOT IN {allowed};"
.format(
table=table,
column=column,
# Invalid: What isn't contained in both new and old
# Can't just remove what's not in new because we get
# a type error
allowed=tuple(set(old).intersection(set(new))),
default=default
)
)


op.execute(
"ALTER TABLE {table} "
# Default needs to be dropped first
"ALTER COLUMN {column} DROP DEFAULT,"
# Replace the tpye
"ALTER COLUMN {column} TYPE {enum_name} USING {column}::text::{enum_name},"
# Reset default
"ALTER COLUMN {column} SET DEFAULT {default};"
.format(
table=table,
column=column,
enum_name=name,
default=default
)
)


# Remove old type
op.execute("DROP TYPE {};".format(tmp_name))

这可以通过升级/降级来实现:

replace_enum_values(
name='enum_name',
new=["A", "B"],
old=["A", "C"],
modify=[('some_table', 'some_column')]
)

所有失效的值都将设置为 server _ default。

找到了另一个方便的方法

op.execute('ALTER TYPE enum_type ADD VALUE new_value')
op.execute('ALTER TYPE enum_type ADD VALUE new_value BEFORE old_value')
op.execute('ALTER TYPE enum_type ADD VALUE new_value AFTER old_value')

这个解决方案很容易理解,对于升级和降级都非常有效。我已经更详细地写了这个答案。

假设我们的 enum_type是这样的:

enum_type = ('some_value_1', 'some_value_2')

我想通过添加一个新的枚举来改变 enum_type,使它变成这样:

enum_type = ('some_value_1', 'some_value_2', 'new_value')

可以这样做:

from alembic import op




def upgrade():
op.execute("COMMIT")
op.execute("ALTER TYPE enum_type ADD VALUE 'new_value'")




def downgrade():
# Drop 'new_value' from enum_type
op.execute("ALTER TYPE enum_type RENAME TO enum_type_tmp")


op.execute("CREATE TYPE enum_type AS ENUM('some_value_1', 'some_value_1')")


op.execute("DROP TYPE enum_type_tmp")

注意: 在降级过程中,如果你在表格中使用 enum_type,那么你可以修改下面提到的降级方法:

def downgrade():
# Drop 'new_value' from enum_type
op.execute("UPDATE table_name"
" SET column_name_using_enum_type_value = NULL"
" WHERE column_name_using_enum_type_value = 'new_value'")


op.execute("ALTER TYPE enum_type RENAME TO enum_type_tmp")


op.execute("CREATE TYPE enum_type AS ENUM('some_value_1', 'some_value_1')")


op.execute("ALTER TABLE table_name"
" ALTER COLUMN column_name_using_enum_type_value TYPE enum_type"
" USING column_name_using_enum_type_value::text::enum_type")


op.execute("DROP TYPE enum_type_tmp")

此方法可用于更新 Enum

def upgrade():
op.execute("ALTER TYPE categorytype RENAME VALUE 'EXAMPLE_A' TO 'EXAMPLE_B'")




def downgrade():
op.execute("ALTER TYPE categorytype RENAME VALUE 'EXAMPLE_B' TO 'EXAMPLE_A'")

首先将列类型改为 VARCHAR ()。

然后删除类型并创建带有新字段的新类型。

最后将列类型改为新创建的类型。

def upgrade():
op.execute(
'''
ALTER TABLE your_table ALTER COLUMN your_enum_column TYPE VARCHAR(255);


DROP TYPE IF EXISTS your_enum_type;


CREATE TYPE your_enum_type AS ENUM
('value1', 'value2', 'value3', 'value4');


ALTER TABLE your_table ALTER COLUMN your_enum_column TYPE your_enum_type
USING (your_enum_column::your_enum_type);
'''
)




def downgrade():
op.execute(
'''
ALTER TABLE your_table ALTER COLUMN your_enum_column TYPE VARCHAR(255);


DROP TYPE IF EXISTS your_enum_type;


CREATE TYPE your_enum_type AS ENUM
('value1', 'value2', 'value3');


ALTER TABLE your_table ALTER COLUMN your_enum_column TYPE your_enum_type
USING (your_enum_column::your_enum_type);
'''
)

观察

为了减轻迁移时的疼痛,I 即使使用 PostgreSQL,也始终使用非本机枚举

非本机枚举只是带约束的字符串,如果编辑枚举,只有三种情况:

  1. 重命名枚举值
  2. 删除枚举值
  3. 添加枚举值。

对于迁徙,2和3是一对。这是可以理解的: 如果你为了添加而升级,那么你必须在降级的时候删除,反之亦然。我们把它们分为两类。

实施

如果是重命名,通常我会分成三步:

  1. 放下旧约束
  2. 将行的旧值更新为新值
  3. 创建新的约束

在阿伦巴克语中,这是通过:

def update_enum(
table, column, enum_class_name, target_values, olds_to_remove, news_to_add
):
op.drop_constraint(f"ck_{table}_{enum_class_name}", table)


for sql in update_enum_sqls(table, column, olds_to_remove, news_to_add):
op.execute(sql)


op.create_check_constraint(
enum_class_name, table, sa.sql.column(column).in_(target_values)
)

让我们先忘记 update_enum_sqls,只是将它用作 SQL 生成器。

如果是移除,还有三个步骤:

  1. 放下旧约束
  2. 使用旧值删除行
  3. 创建新的约束

所以基本上只有 update_enum_sqls会有不同的表现。

如果是加法,只有两个步骤:

  1. 放下旧约束
  2. 创建新的约束

不过,我们可以忽略 update_enum_sqls

那么如何实施呢? 不是很难..。

def update_enum_sql(table, column, old_value, new_value):
if new_value is not None:
return f"UPDATE {table} SET {column} = '{new_value}' where {column} = '{old_value}'"
else:
return f"DELETE FROM {table} where {column} = '{old_value}'"




def update_enum_sqls(table, column, olds_to_remove, news_to_add):
if len(olds_to_remove) != len(news_to_add):
raise NotImplementedError
return [
update_enum_sql(table, column, old, new)
for old, new in zip(olds_to_remove, news_to_add)
]

例子

既然我们已经准备好了材料,我们就应用:

def upgrade():
# rename enum
update_enum(
"my_table",
"my_enum",
"myenumclassname",
["NEW", "ENUM", "VALUES"],
["OLD"],
["NEW"],
)


# add enum
update_enum(
"my_table",
"my_enum",
"myenumclassname",
["NEW", "ENUM", "VALUES"],
[],
[],
)




def downgrade():
# remove enum
update_enum(
"my_table",
"my_enum",
"myenumclassname",
["ENUM", "VALUES"],
["NEW"],
[None],  # this will delete rows with "NEW", USE WITH CARE!!!
)


# edit enum
update_enum(
"my_table",
"my_enum",
"myenumclassname",
["OLD", "ENUM", "VALUES"],
["NEW"],
["OLD"],
)

上面的代码也可以在 大意上找到。

这种方法类似于 公认的解决方案,但有一些细微的差别:

  1. 它使用 op.batch_alter_table而不是 op.execute('ALTER TABLE'),所以这个解决方案可以在 PostgreSQL 和 SQLite 中使用: SQLite 不支持 ALTER TABLE,但是 alembic 提供了对 op.batch_alter_table的支持
  2. 它不使用原始 SQL
from alembic import op
import sqlalchemy as sa




# Describing of enum
enum_name = "status"
temp_enum_name = f"temp_{enum_name}"
old_values = ("nonexistent_executable", "signal", "success", "timed_out")
new_values = ("output_limit_exceeded", *old_values)
downgrade_to = ("output_limit_exceeded", "timed_out") # on downgrade convert [0] to [1]
old_type = sa.Enum(*old_values, name=enum_name)
new_type = sa.Enum(*new_values, name=enum_name)
temp_type = sa.Enum(*new_values, name=temp_enum_name)


# Describing of table
table_name = "testcaseresult"
column_name = "status"
temp_table = sa.sql.table(
table_name,
sa.Column(
column_name,
new_type,
nullable=False
)
)




def upgrade():
# temp type to use instead of old one
temp_type.create(op.get_bind(), checkfirst=False)


# changing of column type from old enum to new one.
# SQLite will create temp table for this
with op.batch_alter_table(table_name) as batch_op:
batch_op.alter_column(
column_name,
existing_type=old_type,
type_=temp_type,
existing_nullable=False,
postgresql_using=f"{column_name}::text::{temp_enum_name}"
)


# remove old enum, create new enum
old_type.drop(op.get_bind(), checkfirst=False)
new_type.create(op.get_bind(), checkfirst=False)


# changing of column type from temp enum to new one.
# SQLite will create temp table for this
with op.batch_alter_table(table_name) as batch_op:
batch_op.alter_column(
column_name,
existing_type=temp_type,
type_=new_type,
existing_nullable=False,
postgresql_using=f"{column_name}::text::{enum_name}"
)


# remove temp enum
temp_type.drop(op.get_bind(), checkfirst=False)




def downgrade():
# old enum don't have new value anymore.
# before downgrading from new enum to old one,
# we should replace new value from new enum with
# somewhat of old values from old enum
op.execute(
temp_table
.update()
.where(
temp_table.c.status == downgrade_to[0]
)
.values(
status=downgrade_to[1]
)
)


temp_type.create(op.get_bind(), checkfirst=False)


with op.batch_alter_table(table_name) as batch_op:
batch_op.alter_column(
column_name,
existing_type=new_type,
type_=temp_type,
existing_nullable=False,
postgresql_using=f"{column_name}::text::{temp_enum_name}"
)


new_type.drop(op.get_bind(), checkfirst=False)
old_type.create(op.get_bind(), checkfirst=False)


with op.batch_alter_table(table_name) as batch_op:
batch_op.alter_column(
column_name,
existing_type=temp_type,
type_=old_type,
existing_nullable=False,
postgresql_using=f"{column_name}::text::{enum_name}"
)


temp_type.drop(op.get_bind(), checkfirst=False)

根据公认的解决方案:

看起来 alembic 在它的 alter _ table 方法中没有对 USING 语句的直接支持。

目前,alembic 在其 alter_table方法中支持 USING语句。