如何UPSERT(合并,插入…重复更新)在PostgreSQL?

这里一个非常常见的问题是如何进行upsert,这是MySQL称为INSERT ... ON DUPLICATE UPDATE的操作,标准支持作为MERGE操作的一部分。

考虑到PostgreSQL不直接支持它(在pg 9.5之前),你如何做到这一点?考虑以下几点:

CREATE TABLE testtable (
id integer PRIMARY KEY,
somedata text NOT NULL
);


INSERT INTO testtable (id, somedata) VALUES
(1, 'fred'),
(2, 'bob');

现在想象一下,你想要“插入”元组(2, 'Joe')(3, 'Alan'),那么新的表内容将是:

(1, 'fred'),
(2, 'Joe'),    -- Changed value of existing tuple
(3, 'Alan')    -- Added new tuple

这就是人们在讨论upsert时所谈论的。至关重要的是,任何方法都必须是在同一表上存在多个事务时是安全的——要么使用显式锁定,要么以其他方式防范结果的竞态条件。

这个主题在插入,重复更新PostgreSQL?上被广泛讨论,但那是关于MySQL语法的替代方案,随着时间的推移,它增加了相当多不相关的细节。我正在研究明确的答案。

这些技术对于“如果不存在就插入,否则什么都不做”也很有用。“插入…重复键忽略”。

351344 次浏览

9.5及更新版本:

PostgreSQL 9.5及更新版本支持INSERT ... ON CONFLICT (key) DO UPDATE(和ON CONFLICT (key) DO NOTHING),即upsert。

ON DUPLICATE KEY UPDATE的比较

快速的解释

用法参见手动 -特别是语法图中的conflict_action子句和解释性文本

与下面给出的9.4及更老版本的解决方案不同,此特性适用于多个冲突行的情况,并且不需要排他锁定或重试循环。

添加该特性的提交在这里关于其发展的讨论在这里


如果您使用的是9.5并且不需要向后兼容,您现在可以停止阅读


9.4及以上版本:

PostgreSQL没有任何内置的UPSERT(或MERGE)功能,并且在并发使用的情况下高效地执行它是非常困难的。

本文详细讨论了这个问题

一般来说,你有两个选择:

  • 重试循环中的单个插入/更新操作;或
  • 锁定表并进行批量合并

个别行重试循环

如果希望多个连接同时尝试执行插入,那么在重试循环中使用单独的行upserts是合理的选择。

PostgreSQL文档包含了一个有用的过程,可以让你在数据库内部的循环中完成这个过程。与大多数简单的解决方案不同,它防止丢失更新和插入竞赛。它将只在READ COMMITTED模式下工作,并且只有当它是你在事务中唯一做的事情时才安全。如果触发器或次要惟一键导致惟一违反,则该函数将无法正常工作。

这种策略效率很低。只要可行,您应该将工作排队,并按照下面描述的方式进行批量upsert。

许多尝试解决此问题的解决方案没有考虑回滚,因此导致更新不完整。两个事务相互竞争;其中一个成功INSERTs;另一个会得到一个重复的键错误,并执行UPDATEUPDATE阻塞等待INSERT回滚或提交。当它回滚时,UPDATE条件重新检查匹配0行,所以即使UPDATE提交了,它实际上也没有完成您期望的upsert。您必须检查结果行数,并在必要时重新尝试。

一些尝试的解决方案也没有考虑SELECT竞争。如果你尝试一些显而易见的简单方法:

-- THIS IS WRONG. DO NOT COPY IT. It's an EXAMPLE.


BEGIN;


UPDATE testtable
SET somedata = 'blah'
WHERE id = 2;


-- Remember, this is WRONG. Do NOT COPY IT.


INSERT INTO testtable (id, somedata)
SELECT 2, 'blah'
WHERE NOT EXISTS (SELECT 1 FROM testtable WHERE testtable.id = 2);


COMMIT;

当两种模式同时运行时,就有几种失效模式。一个是已经讨论过的更新重新检查问题。另一种是同时UPDATE,匹配零行并继续。然后他们都做EXISTS测试,发生之前 INSERT。都得到0行,所以都执行INSERT。其中一个失败,出现重复键错误。

这就是为什么你需要一个re-try循环。您可能认为可以使用聪明的SQL来防止重复的键错误或丢失的更新,但这是不可能的。您需要检查行数或处理重复键错误(取决于所选择的方法),然后重试。

请不要自己动手解决这个问题。就像消息排队一样,这可能是错误的。

散装上塞带锁

有时您希望执行批量upsert,其中您有一个新数据集,希望将其合并到旧的现有数据集中。这是大大比单独的行upserts更有效,应该在实际情况下优先使用。

在这种情况下,您通常遵循以下流程:

  • CREATE一个TEMPORARY

  • COPY或批量插入新数据到临时表

  • LOCK目标表IN EXCLUSIVE MODE。这允许其他事务SELECT,但不对表进行任何更改。

  • 使用临时表中的值对现有记录进行UPDATE ... FROM;

  • 对目标表中不存在的行执行INSERT;

  • COMMIT,释放锁。

例如,对于问题中给出的例子,使用多值INSERT填充临时表:

BEGIN;


CREATE TEMPORARY TABLE newvals(id integer, somedata text);


INSERT INTO newvals(id, somedata) VALUES (2, 'Joe'), (3, 'Alan');


LOCK TABLE testtable IN EXCLUSIVE MODE;


UPDATE testtable
SET somedata = newvals.somedata
FROM newvals
WHERE newvals.id = testtable.id;


INSERT INTO testtable
SELECT newvals.id, newvals.somedata
FROM newvals
LEFT OUTER JOIN testtable ON (testtable.id = newvals.id)
WHERE testtable.id IS NULL;


COMMIT;

相关阅读

MERGE呢?

sql标准MERGE实际上具有定义不佳的并发语义,不适合在不先锁定表的情况下进行upserting。

对于数据合并,这是一个非常有用的OLAP语句,但对于并发安全的upsert,它实际上不是一个有用的解决方案。对于使用其他dbms使用MERGE进行upserts的人,有很多建议,但这实际上是错误的。

其他星展:

我正在尝试为PostgreSQL 9.5之前版本的单次插入问题提供另一种解决方案。这个想法很简单,首先尝试执行插入,如果记录已经存在,则更新它:

do $$
begin
insert into testtable(id, somedata) values(2,'Joe');
exception when unique_violation then
update testtable set somedata = 'Joe' where id = 2;
end $$;

注意,这个解决方案可以应用于只有在没有删除表中的行时

我不知道这个解决方案的效率如何,但在我看来它是合理的。

WITH UPD AS (UPDATE TEST_TABLE SET SOME_DATA = 'Joe' WHERE ID = 2
RETURNING ID),
INS AS (SELECT '2', 'Joe' WHERE NOT EXISTS (SELECT * FROM UPD))
INSERT INTO TEST_TABLE(ID, SOME_DATA) SELECT * FROM INS

在Postgresql 9.3上测试

由于这个问题是关闭的,我在这里张贴如何使用SQLAlchemy。通过递归,它重新尝试批量插入或更新以对抗竞态条件和验证错误。

首先是进口

import itertools as it


from functools import partial
from operator import itemgetter


from sqlalchemy.exc import IntegrityError
from app import session
from models import Posts

现在有几个辅助函数

def chunk(content, chunksize=None):
"""Groups data into chunks each with (at most) `chunksize` items.
https://stackoverflow.com/a/22919323/408556
"""
if chunksize:
i = iter(content)
generator = (list(it.islice(i, chunksize)) for _ in it.count())
else:
generator = iter([content])


return it.takewhile(bool, generator)




def gen_resources(records):
"""Yields a dictionary if the record's id already exists, a row object
otherwise.
"""
ids = {item[0] for item in session.query(Posts.id)}


for record in records:
is_row = hasattr(record, 'to_dict')


if is_row and record.id in ids:
# It's a row but the id already exists, so we need to convert it
# to a dict that updates the existing record. Since it is duplicate,
# also yield True
yield record.to_dict(), True
elif is_row:
# It's a row and the id doesn't exist, so no conversion needed.
# Since it's not a duplicate, also yield False
yield record, False
elif record['id'] in ids:
# It's a dict and the id already exists, so no conversion needed.
# Since it is duplicate, also yield True
yield record, True
else:
# It's a dict and the id doesn't exist, so we need to convert it.
# Since it's not a duplicate, also yield False
yield Posts(**record), False

最后是upsert函数

def upsert(data, chunksize=None):
for records in chunk(data, chunksize):
resources = gen_resources(records)
sorted_resources = sorted(resources, key=itemgetter(1))


for dupe, group in it.groupby(sorted_resources, itemgetter(1)):
items = [g[0] for g in group]


if dupe:
_upsert = partial(session.bulk_update_mappings, Posts)
else:
_upsert = session.add_all


try:
_upsert(items)
session.commit()
except IntegrityError:
# A record was added or deleted after we checked, so retry
#
# modify accordingly by adding additional exceptions, e.g.,
# except (IntegrityError, ValidationError, ValueError)
db.session.rollback()
upsert(items)
except Exception as e:
# Some other error occurred so reduce chunksize to isolate the
# offending row(s)
db.session.rollback()
num_items = len(items)


if num_items > 1:
upsert(items, num_items // 2)
else:
print('Error adding record {}'.format(items[0]))

下面是你如何使用它

>>> data = [
...     {'id': 1, 'text': 'updated post1'},
...     {'id': 5, 'text': 'updated post5'},
...     {'id': 1000, 'text': 'new post1000'}]
...
>>> upsert(data)

bulk_save_objects相比,它的优点是可以处理插入上的关系、错误检查等(不像批量操作)。

SQLAlchemy upsert for Postgres >=9.5

由于上面的大文章涵盖了Postgres版本的许多不同的SQL方法(不仅仅是问题中的非9.5),我想补充一下如果您使用的是Postgres 9.5,如何在SQLAlchemy中做到这一点。除了实现自己的upsert,还可以使用SQLAlchemy的函数(在SQLAlchemy 1.1中添加)。就我个人而言,如果可能的话,我会推荐使用这些工具。不仅因为方便,还因为它可以让PostgreSQL处理任何可能发生的竞争条件。

交叉张贴我昨天给出的另一个答案(https://stackoverflow.com/a/44395983/2156909)

SQLAlchemy现在支持ON CONFLICT,有两个方法on_conflict_do_update()on_conflict_do_nothing():

从文档中复制:

from sqlalchemy.dialects.postgresql import insert


stmt = insert(my_table).values(user_email='a@b.com', data='inserted data')
stmt = stmt.on_conflict_do_update(
index_elements=[my_table.c.user_email],
index_where=my_table.c.user_email.like('%@gmail.com'),
set_=dict(data=stmt.excluded.data)
)
conn.execute(stmt)

http://docs.sqlalchemy.org/en/latest/dialects/postgresql.html?highlight=conflict#insert-on-conflict-upsert

下面是insert ... on conflict ... (pg 9.5 +)的一些例子:

  • 插入,在conflict - 什么都不做上。
    insert into dummy(id, name, size) values(1, 'new_name', 3)
    on conflict do nothing;`
    
  • Insert, on conflict - do update, specify conflict target via column.
    insert into dummy(id, name, size) values(1, 'new_name', 3)
    on conflict(id)
    do update set name = 'new_name', size = 3;
    
  • Insert, on conflict - do update, specify conflict target via constraint name.
    insert into dummy(id, name, size) values(1, 'new_name', 3)
    on conflict on constraint dummy_pkey
    do update set name = 'new_name', size = 4;
    

在PostgreSQL v. 15合并

由于PostgreSQL的诉15,是可以使用MERGE命令的。它实际上已经被表示为这个新版本的首先是主要的改进

它使用WHEN MATCHED / WHEN NOT MATCHED条件,以便在存在具有相同条件的现有行时选择行为。

它甚至比标准的UPSERT更好,因为新特性可以批量控制INSERTUPDATEDELETE行。

MERGE INTO customer_account ca
USING recent_transactions t
ON t.customer_id = ca.customer_id
WHEN MATCHED THEN
UPDATE SET balance = balance + transaction_value
WHEN NOT MATCHED THEN
INSERT (customer_id, balance)
VALUES (t.customer_id, t.transaction_value)