如何在 Alembic 升级脚本中执行插入和更新?

我需要在 Alembic 升级期间更改数据。

我目前有一个“球员”表在第一次修订:

def upgrade():
op.create_table('player',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('name', sa.Unicode(length=200), nullable=False),
sa.Column('position', sa.Unicode(length=200), nullable=True),
sa.Column('team', sa.Unicode(length=100), nullable=True)
sa.PrimaryKeyConstraint('id')
)

我想介绍一个“团队”表,我已经创建了第二个版本:

def upgrade():
op.create_table('teams',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('name', sa.String(length=80), nullable=False)
)
op.add_column('players', sa.Column('team_id', sa.Integer(), nullable=False))

我希望第二次迁移也增加以下数据:

  1. 人口队伍表:

    INSERT INTO teams (name) SELECT DISTINCT team FROM players;
    
  2. Update players.team_id based on players.team name:

    UPDATE players AS p JOIN teams AS t SET p.team_id = t.id WHERE p.team = t.name;
    

How do I execute inserts and updates inside the upgrade script?

79379 次浏览

你所要求的是一个 数据迁移,而不是 模式迁移,这是最普遍的阿朗比克文件。

这个答案假设您使用声明式(与 class-Mapper-Table 或 core 相反)来定义模型。将这种形式适应于其他形式应该相对简单。

注意,Alembic 提供了一些基本的数据函数: op.bulk_insert()op.execute()。如果操作相当少,请使用这些。如果迁移需要关系或其他复杂的交互,我倾向于使用下面描述的模型和会话的全部功能。

下面是一个示例迁移脚本,它设置了一些声明性模型,这些模型将用于操作会话中的数据。重点是:

  1. 定义所需的基本模型和所需的列。您不需要每一列,只需要主键和将要使用的键。

  2. 在升级函数中,使用 op.get_bind()获取当前连接,并与之进行会话。

    • 或者使用 bind.execute()使用 SQLAlchemy 的底层直接编写 SQL 查询。
  3. 像在应用程序中一样使用模型和会话。

"""create teams table


Revision ID: 169ad57156f0
Revises: 29b4c2bfce6d
Create Date: 2014-06-25 09:00:06.784170
"""


revision = '169ad57156f0'
down_revision = '29b4c2bfce6d'


from alembic import op
import sqlalchemy as sa
from sqlalchemy import orm
from sqlalchemy.ext.declarative import declarative_base


Base = declarative_base()




class Player(Base):
__tablename__ = 'players'


id = sa.Column(sa.Integer, primary_key=True)
name = sa.Column(sa.String, nullable=False)
team_name = sa.Column('team', sa.String, nullable=False)
team_id = sa.Column(sa.Integer, sa.ForeignKey('teams.id'), nullable=False)


team = orm.relationship('Team', backref='players')




class Team(Base):
__tablename__ = 'teams'


id = sa.Column(sa.Integer, primary_key=True)
name = sa.Column(sa.String, nullable=False, unique=True)




def upgrade():
bind = op.get_bind()
session = orm.Session(bind=bind)


# create the teams table and the players.team_id column
Team.__table__.create(bind)
op.add_column('players', sa.Column('team_id', sa.ForeignKey('teams.id'), nullable=False)


# create teams for each team name
teams = {name: Team(name=name) for name in session.query(Player.team).distinct()}
session.add_all(teams.values())


# set player team based on team name
for player in session.query(Player):
player.team = teams[player.team_name]


session.commit()


# don't need team name now that team relationship is set
op.drop_column('players', 'team')




def downgrade():
bind = op.get_bind()
session = orm.Session(bind=bind)


# re-add the players.team column
op.add_column('players', sa.Column('team', sa.String, nullable=False)


# set players.team based on team relationship
for player in session.query(Player):
player.team_name = player.team.name


session.commit()


op.drop_column('players', 'team_id')
op.drop_table('teams')

迁移定义了单独的模型,因为代码中的模型代表数据库的 现状,而迁移代表 一路走来。您的数据库可能处于沿着该路径的任何状态,因此模型可能尚未与数据库同步。除非您非常小心,否则直接使用实际模型将导致缺少列、无效数据等问题。明确地说明在迁移中将使用哪些列和模型会更清楚。

我建议使用使用 ad-hoc 表 正式文件中有详细说明的 SQLAlchemy 核心语句,因为它允许使用不可知的 SQL 和 pythonic 编写,并且是自包含的。SQLAlchemyCore 对于迁移脚本来说是两全其美的。

下面是这个概念的一个例子:

from sqlalchemy.sql import table, column
from sqlalchemy import String
from alembic import op


account = table('account',
column('name', String)
)
op.execute(
account.update().\\
where(account.c.name==op.inline_literal('account 1')).\\
values({'name':op.inline_literal('account 2')})
)


# If insert is required
from sqlalchemy.sql import insert
from sqlalchemy import orm


bind = op.get_bind()
session = orm.Session(bind=bind)


data = {
"name": "John",
}
ret = session.execute(insert(account).values(data))
# for use in other insert calls
account_id = ret.lastrowid

还可以使用如下示例所示的直接 SQL see (压缩操作参考) :

from alembic import op


# revision identifiers, used by Alembic.
revision = '1ce7873ac4ced2'
down_revision = '1cea0ac4ced2'
branch_labels = None
depends_on = None




def upgrade():
# ### commands made by andrew ###
op.execute('UPDATE STOCK SET IN_STOCK = -1 WHERE IN_STOCK IS NULL')
# ### end Alembic commands ###




def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
pass
# ### end Alembic commands ###