How to do an upsert with SqlAlchemy?

I have a record that I want to exist in the database if it is not there, and if it is there already (primary key exists) I want the fields to be updated to the current state. This is often called an upsert.

The following incomplete code snippet demonstrates what will work, but it seems excessively clunky (especially if there were a lot more columns). What is the better/best way?

Base = declarative_base()
class Template(Base):
__tablename__ = 'templates'
id = Column(Integer, primary_key = True)
name = Column(String(80), unique = True, index = True)
template = Column(String(80), unique = True)
description = Column(String(200))
def __init__(self, Name, Template, Desc):
self.name = Name
self.template = Template
self.description = Desc


def UpsertDefaultTemplate():
sess = Session()
desired_default = Template("default", "AABBCC", "This is the default template")
try:
q = sess.query(Template).filter_by(name = desiredDefault.name)
existing_default = q.one()
except sqlalchemy.orm.exc.NoResultFound:
#default does not exist yet, so add it...
sess.add(desired_default)
else:
#default already exists.  Make sure the values are what we want...
assert isinstance(existing_default, Template)
existing_default.name = desired_default.name
existing_default.template = desired_default.template
existing_default.description = desired_default.description
sess.flush()

Is there a better or less verbose way of doing this? Something like this would be great:

sess.upsert_this(desired_default, unique_key = "name")

although the unique_key kwarg is obviously unnecessary (the ORM should be able to easily figure this out) I added it just because SQLAlchemy tends to only work with the primary key. eg: I've been looking at whether Session.merge would be applicable, but this works only on primary key, which in this case is an autoincrementing id which is not terribly useful for this purpose.

A sample use case for this is simply when starting up a server application that may have upgraded its default expected data. ie: no concurrency concerns for this upsert.

100033 次浏览

SQLAlchemy 确实有一个“保存或更新”行为,在最近的版本中,这个行为已经内置到 session.add中,但是以前是单独的 session.saveorupdate调用。这不是一个“颠覆”,但它可能足以满足你的需要。

您询问具有多个唯一键的类是件好事; 我相信这正是没有单一正确方法来实现这一点的原因。主键也是唯一键。如果没有唯一的约束,只有主键,那么问题就很简单了: 如果没有给定的 ID,或者 ID 为 Nothing,那么创建一条新记录; 否则使用该主键更新现有记录中的所有其他字段。

然而,当存在额外的唯一约束时,这种简单的方法存在逻辑问题。如果您想“插入”一个对象,并且您的对象的主键匹配现有的记录,但是另一个唯一的列匹配 与众不同记录,那么您应该怎么做?类似地,如果主键不匹配现有记录,但另一个唯一列 是的匹配现有记录,那么会怎样?对于你的特殊情况,可能有一个正确的答案,但总的来说,我认为没有一个单一的正确答案。

这就是为什么没有内置的“逆向”操作。应用程序必须定义在每个特定情况下这意味着什么。

SQLAlchemy 通过两种方法 on_conflict_do_update()on_conflict_do_nothing()支持 ON CONFLICT

Copying from 文件:

from sqlalchemy.dialects.postgresql import insert


stmt = insert(my_table).values(user_email='a@b.com', data='inserted data')
stmt = stmt.on_conflict_do_update(
index_elements=[my_table.c.user_email],
index_where=my_table.c.user_email.like('%@gmail.com'),
set_=dict(data=stmt.excluded.data)
)
conn.execute(stmt)

我使用的是“三思而后行”的方法:

# first get the object from the database if it exists
# we're guaranteed to only get one or zero results
# because we're filtering by primary key
switch_command = session.query(Switch_Command).\
filter(Switch_Command.switch_id == switch.id).\
filter(Switch_Command.command_id == command.id).first()


# If we didn't get anything, make one
if not switch_command:
switch_command = Switch_Command(switch_id=switch.id, command_id=command.id)


# update the stuff we care about
switch_command.output = 'Hooray!'
switch_command.lastseen = datetime.datetime.utcnow()


session.add(switch_command)
# This will generate either an INSERT or UPDATE
# depending on whether we have a new object or not
session.commit()

这样做的好处是它是 db 中立的,而且我认为读起来很清楚。缺点是在下面这样的场景中存在一个潜在的 种族情况:

  • 我们查询数据库寻找 switch_command,但是没有找到
  • 我们创建一个 switch_command
  • 另一个进程或线程使用与我们相同的主键创建 switch_command
  • 我们尝试提交我们的 switch_command

现在,SQLAlchemy 提供了两个有用的函数 on_conflict_do_nothingon_conflict_do_update。这些函数很有用,但是需要您从 ORM 接口切换到底层的 SQLAlchemy 核心

尽管这两个函数使得使用 SQLAlchemy 的语法进行 upserting 变得不那么困难,但是这些函数远远没有提供一个完整的开箱即用的 upserting 解决方案。

我的常见用例是在单个 SQL 查询/会话执行中插入一大块行。我通常会遇到两个翻转的问题:

例如,我们已经习惯的更高级别的 ORM 功能缺失了。不能使用 ORM 对象,而必须在插入时提供 ForeignKey

我使用 这个函数来处理这两个问题:

def upsert(session, model, rows):
table = model.__table__
stmt = postgresql.insert(table)
primary_keys = [key.name for key in inspect(table).primary_key]
update_dict = {c.name: c for c in stmt.excluded if not c.primary_key}


if not update_dict:
raise ValueError("insert_or_update resulted in an empty update_dict")


stmt = stmt.on_conflict_do_update(index_elements=primary_keys,
set_=update_dict)


seen = set()
foreign_keys = {col.name: list(col.foreign_keys)[0].column for col in table.columns if col.foreign_keys}
unique_constraints = [c for c in table.constraints if isinstance(c, UniqueConstraint)]
def handle_foreignkeys_constraints(row):
for c_name, c_value in foreign_keys.items():
foreign_obj = row.pop(c_value.table.name, None)
row[c_name] = getattr(foreign_obj, c_value.name) if foreign_obj else None


for const in unique_constraints:
unique = tuple([const,] + [row[col.name] for col in const.columns])
if unique in seen:
return None
seen.add(unique)


return row


rows = list(filter(None, (handle_foreignkeys_constraints(row) for row in rows)))
session.execute(stmt, rows)

这对我和 sqlite3和 postgres 都有效。虽然它可能会失败与组合主键约束,并将最有可能失败与额外的独特约束。

    try:
t = self._meta.tables[data['table']]
except KeyError:
self._log.error('table "%s" unknown', data['table'])
return


try:
q = insert(t, values=data['values'])
self._log.debug(q)
self._db.execute(q)
except IntegrityError:
self._log.warning('integrity error')
where_clause = [c.__eq__(data['values'][c.name]) for c in t.c if c.primary_key]
update_dict = {c.name: data['values'][c.name] for c in t.c if not c.primary_key}
q = update(t, values=update_dict).where(*where_clause)
self._log.debug(q)
self._db.execute(q)
except Exception as e:
self._log.error('%s: %s', t.name, e)

下面的工作与红移数据库我很好,也将工作组合主键约束。

消息来源: http://gist.github.com/bhtucker/c40578a2fb3ca50b324e42ef9dce58e1

在函数中创建 SQLAlchemy 引擎所需的修改很少 Def start _ engine ()

from sqlalchemy import Column, Integer, Date ,Metadata
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.dialects import postgresql


Base = declarative_base()


def start_engine():
engine = create_engine(os.getenv('SQLALCHEMY_URI',
'postgresql://localhost:5432/upsert'))
connect = engine.connect()
meta = MetaData(bind=engine)
meta.reflect(bind=engine)
return engine




class DigitalSpend(Base):
__tablename__ = 'digital_spend'
report_date = Column(Date, nullable=False)
day = Column(Date, nullable=False, primary_key=True)
impressions = Column(Integer)
conversions = Column(Integer)


def __repr__(self):
return str([getattr(self, c.name, None) for c in self.__table__.c])




def compile_query(query):
compiler = query.compile if not hasattr(query, 'statement') else
query.statement.compile
return compiler(dialect=postgresql.dialect())




def upsert(session, model, rows, as_of_date_col='report_date', no_update_cols=[]):
table = model.__table__


stmt = insert(table).values(rows)


update_cols = [c.name for c in table.c
if c not in list(table.primary_key.columns)
and c.name not in no_update_cols]


on_conflict_stmt = stmt.on_conflict_do_update(
index_elements=table.primary_key.columns,
set_={k: getattr(stmt.excluded, k) for k in update_cols},
index_where=(getattr(model, as_of_date_col) < getattr(stmt.excluded, as_of_date_col))
)


print(compile_query(on_conflict_stmt))
session.execute(on_conflict_stmt)




session = start_engine()
upsert(session, DigitalSpend, initial_rows, no_update_cols=['conversions'])

This allows access to the underlying models based on string names

def get_class_by_tablename(tablename):
"""Return class reference mapped to table.
https://stackoverflow.com/questions/11668355/sqlalchemy-get-model-from-table-name-this-may-imply-appending-some-function-to
:param tablename: String with name of table.
:return: Class reference or None.
"""
for c in Base._decl_class_registry.values():
if hasattr(c, '__tablename__') and c.__tablename__ == tablename:
return c




sqla_tbl = get_class_by_tablename(table_name)


def handle_upsert(record_dict, table):
"""
handles updates when there are primary key conflicts


"""
try:
self.active_session().add(table(**record_dict))
except:
# Here we'll assume the error is caused by an integrity error
# We do this because the error classes are passed from the
# underlying package (pyodbc / sqllite) SQLAlchemy doesn't mask
# them with it's own code - this should be updated to have
# explicit error handling for each new db engine


# <update>add explicit error handling for each db engine</update>
active_session.rollback()
# Query for conflic class, use update method to change values based on dict
c_tbl_primary_keys = [i.name for i in table.__table__.primary_key] # List of primary key col names
c_tbl_cols = dict(sqla_tbl.__table__.columns) # String:Col Object crosswalk


c_query_dict = {k:record_dict[k] for k in c_tbl_primary_keys if k in record_dict} # sub-dict from data of primary key:values
c_oo_query_dict = {c_tbl_cols[k]:v for (k,v) in c_query_dict.items()} # col-object:query value for primary key cols


c_target_record = session.query(sqla_tbl).filter(*[k==v for (k,v) in oo_query_dict.items()]).first()


# apply new data values to the existing record
for k, v in record_dict.items()
setattr(c_target_record, k, v)

There are multiple answers and here comes yet another answer (YAA). Other answers are not that readable due to the metaprogramming involved. Here is an example that

  • 使用 SQLAlchemy ORM

  • 演示如何使用 on_conflict_do_nothing创建零行的行

  • 演示如何在不使用 on_conflict_do_update创建新行的情况下更新现有行(如果有的话)

  • 使用表主键作为 constraint

the original question what this code is related to中的一个更长的示例。


import sqlalchemy as sa
import sqlalchemy.orm as orm
from sqlalchemy import text
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.orm import Session


class PairState(Base):


__tablename__ = "pair_state"


# This table has 1-to-1 relationship with Pair
pair_id = sa.Column(sa.ForeignKey("pair.id"), nullable=False, primary_key=True, unique=True)
pair = orm.relationship(Pair,
backref=orm.backref("pair_state",
lazy="dynamic",
cascade="all, delete-orphan",
single_parent=True, ), )




# First raw event in data stream
first_event_at = sa.Column(sa.TIMESTAMP(timezone=True), nullable=False, server_default=text("TO_TIMESTAMP(0)"))


# Last raw event in data stream
last_event_at = sa.Column(sa.TIMESTAMP(timezone=True), nullable=False, server_default=text("TO_TIMESTAMP(0)"))


# The last hypertable entry added
last_interval_at = sa.Column(sa.TIMESTAMP(timezone=True), nullable=False, server_default=text("TO_TIMESTAMP(0)"))


@staticmethod
def create_first_event_if_not_exist(dbsession: Session, pair_id: int, ts: datetime.datetime):
"""Sets the first event value if not exist yet."""
dbsession.execute(
insert(PairState).
values(pair_id=pair_id, first_event_at=ts).
on_conflict_do_nothing()
)


@staticmethod
def update_last_event(dbsession: Session, pair_id: int, ts: datetime.datetime):
"""Replaces the the column last_event_at for a named pair."""
# Based on the original example of https://stackoverflow.com/a/49917004/315168
dbsession.execute(
insert(PairState).
values(pair_id=pair_id, last_event_at=ts).
on_conflict_do_update(constraint=PairState.__table__.primary_key, set_={"last_event_at": ts})
)


@staticmethod
def update_last_interval(dbsession: Session, pair_id: int, ts: datetime.datetime):
"""Replaces the the column last_interval_at for a named pair."""
dbsession.execute(
insert(PairState).
values(pair_id=pair_id, last_interval_at=ts).
on_conflict_do_update(constraint=PairState.__table__.primary_key, set_={"last_interval_at": ts})
)

对于 sqlite,可以在定义 UniqueConstraint时使用 sqlite_on_conflict='REPLACE'选项,在定义单个列的唯一约束时使用 sqlite_on_conflict_unique选项。然后 session.add将以一种类似于 upsert的方式工作。见官方 文件

由于我们在生成缺省 id 和引用时遇到了问题,这些问题会导致 ForeignKeyViviation-Errorlike

update or delete on table "..." violates foreign key constraint
Key (id)=(...) is still referenced from table "...".

我们必须排除更新 dict 的 id,否则它将总是作为新的默认值生成。

此外,该方法还返回创建/更新的实体。

from sqlalchemy.dialects.postgresql import insert # Important to use the postgresql insert




def upsert(session, data, key_columns, model):


stmt = insert(model).values(data)
    

# Important to exclude the ID for update!
exclude_for_update = [model.id.name, *key_columns]
update_dict = {c.name: c for c in stmt.excluded if c.name not in exclude_for_update}


stmt = stmt.on_conflict_do_update(
index_elements=key_columns,
set_=update_dict
).returning(model)


orm_stmt = (
select(model)
.from_statement(stmt)
.execution_options(populate_existing=True)
)


return session.execute(orm_stmt).scalar()


例如:


class UpsertUser(Base):
__tablename__ = 'upsert_user'
id = Column(Id, primary_key=True, default=uuid.uuid4)
name: str = Column(sa.String, nullable=False)
user_sid: str = Column(sa.String, nullable=False, unique=True)
house_admin = relationship('UpsertHouse', back_populates='admin', uselist=False)




class UpsertHouse(Base):
__tablename__ = 'upsert_house'
id = Column(Id, primary_key=True, default=uuid.uuid4)
admin_id: Id = Column(Id, ForeignKey('upsert_user.id'), nullable=False)
admin: UpsertUser = relationship('UpsertUser', back_populates='house_admin', uselist=False)


# Usage


upserted_user = upsert(session, updated_user, [UpsertUser.user_sid.name], UpsertUser)

注意: 只在 postgreql 上进行了测试,但也可以在其他支持 ON DUPLICATE KEY UPDATE 的 DBs 上进行测试,例如 MySQL