SQLAlchemy是否有与django的get_or_create等价的函数?

我想从数据库中获得一个对象,如果它已经存在(基于提供的参数)或创建它,如果它不存在。

Django的get_or_create(或)就是这样做的。在SQLAlchemy中是否有等价的快捷方式?

我现在明确地像这样写出来:

def get_or_create_instrument(session, serial_number):
instrument = session.query(Instrument).filter_by(serial_number=serial_number).first()
if instrument:
return instrument
else:
instrument = Instrument(serial_number)
session.add(instrument)
return instrument
87664 次浏览

基本上就是这么做的,没有捷径可走。

当然,你可以把它概括为:

def get_or_create(session, model, defaults=None, **kwargs):
instance = session.query(model).filter_by(**kwargs).one_or_none()
if instance:
return instance, False
else:
params = {k: v for k, v in kwargs.items() if not isinstance(v, ClauseElement)}
params.update(defaults or {})
instance = model(**params)
try:
session.add(instance)
session.commit()
except Exception:  # The actual exception depends on the specific database so we catch all exceptions. This is similar to the official documentation: https://docs.sqlalchemy.org/en/latest/orm/session_transaction.html
session.rollback()
instance = session.query(model).filter_by(**kwargs).one()
return instance, False
else:
return instance, True

2020年更新(Python 3.9+ ONLY)

下面是Python 3.9的新的字典联合运算符(|=)的简洁版本

def get_or_create(session, model, defaults=None, **kwargs):
instance = session.query(model).filter_by(**kwargs).one_or_none()
if instance:
return instance, False
else:
kwargs |= defaults or {}
instance = model(**kwargs)
try:
session.add(instance)
session.commit()
except Exception:  # The actual exception depends on the specific database so we catch all exceptions. This is similar to the official documentation: https://docs.sqlalchemy.org/en/latest/orm/session_transaction.html
session.rollback()
instance = session.query(model).filter_by(**kwargs).one()
return instance, False
else:
return instance, True

注意:

类似于Django版本,这将捕获重复的关键约束和类似的错误。如果你的get或create不能保证返回一个结果,它仍然会导致竞争条件。

为了缓解这个问题,你需要在session.commit()之后添加另一个one_or_none()样式的取回。这仍然不能100%保证不出现竞争条件,除非你也使用with_for_update()或可序列化的事务模式。

遵循@WoLpH的解决方案,这是适用于我的代码(简单版本):

def get_or_create(session, model, **kwargs):
instance = session.query(model).filter_by(**kwargs).first()
if instance:
return instance
else:
instance = model(**kwargs)
session.add(instance)
session.commit()
return instance

这样,我就能够get_or_create我的模型的任何对象。

假设我的模型对象是:

class Country(Base):
__tablename__ = 'countries'
id = Column(Integer, primary_key=True)
name = Column(String, unique=True)

为了获得或创建我的对象,我写:

myCountry = get_or_create(session, Country, name=countryName)

这个SQLALchemy食谱做的工作很好很优雅。

首先要做的是定义一个函数,给它一个Session来使用,并将一个字典与Session()相关联,Session()跟踪当前的独特的键。

def _unique(session, cls, hashfunc, queryfunc, constructor, arg, kw):
cache = getattr(session, '_unique_cache', None)
if cache is None:
session._unique_cache = cache = {}


key = (cls, hashfunc(*arg, **kw))
if key in cache:
return cache[key]
else:
with session.no_autoflush:
q = session.query(cls)
q = queryfunc(q, *arg, **kw)
obj = q.first()
if not obj:
obj = constructor(*arg, **kw)
session.add(obj)
cache[key] = obj
return obj

使用这个函数的一个例子是在mixin中:

class UniqueMixin(object):
@classmethod
def unique_hash(cls, *arg, **kw):
raise NotImplementedError()


@classmethod
def unique_filter(cls, query, *arg, **kw):
raise NotImplementedError()


@classmethod
def as_unique(cls, session, *arg, **kw):
return _unique(
session,
cls,
cls.unique_hash,
cls.unique_filter,
cls,
arg, kw
)

最后创建唯一的get_or_create模型:

from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base


Base = declarative_base()


engine = create_engine('sqlite://', echo=True)


Session = sessionmaker(bind=engine)


class Widget(UniqueMixin, Base):
__tablename__ = 'widget'


id = Column(Integer, primary_key=True)
name = Column(String, unique=True, nullable=False)


@classmethod
def unique_hash(cls, name):
return name


@classmethod
def unique_filter(cls, query, name):
return query.filter(Widget.name == name)


Base.metadata.create_all(engine)


session = Session()


w1, w2, w3 = Widget.as_unique(session, name='w1'), \
Widget.as_unique(session, name='w2'), \
Widget.as_unique(session, name='w3')
w1b = Widget.as_unique(session, name='w1')


assert w1 is w1b
assert w2 is not w3
assert w2 is not w1


session.commit()

这个食谱更深入地阐述了这个想法,并提供了不同的方法,但我使用这个方法非常成功。

我一直在研究这个问题,并最终得到了一个相当强大的解决方案:

def get_one_or_create(session,
model,
create_method='',
create_method_kwargs=None,
**kwargs):
try:
return session.query(model).filter_by(**kwargs).one(), False
except NoResultFound:
kwargs.update(create_method_kwargs or {})
created = getattr(model, create_method, model)(**kwargs)
try:
session.add(created)
session.flush()
return created, True
except IntegrityError:
session.rollback()
return session.query(model).filter_by(**kwargs).one(), False

我只是在所有细节上写了一个相当广泛的博客文章,但有一些关于我为什么使用它的想法。

  1. 它解包到一个元组,该元组告诉您对象是否存在。这在您的工作流中通常是有用的。

  2. 该函数提供了使用@classmethod修饰的创建者函数(以及特定于它们的属性)的能力。

  3. 当有多个进程连接到数据存储时,该解决方案可以防止Race Conditions。

编辑:正如在这篇博文中解释的那样,我已经将session.commit()更改为session.flush()。注意,这些决策是特定于所使用的数据存储的(在本例中是Postgres)。

编辑2:我在函数中使用{}作为默认值进行更新,因为这是典型的Python陷阱。谢谢你的评论,奈杰尔!如果你对此感到好奇,可以查看这个StackOverflow问题这篇博文

语义上最接近的可能是:

def get_or_create(model, **kwargs):
"""SqlAlchemy implementation of Django's get_or_create.
"""
session = Session()
instance = session.query(model).filter_by(**kwargs).first()
if instance:
return instance, False
else:
instance = model(**kwargs)
session.add(instance)
session.commit()
return instance, True

不确定在sqlalchemy中依赖全局定义的Session是否合适,但Django版本不接受连接,所以…

返回的元组包含实例和一个布尔值,表示实例是否已创建(例如,如果从db中读取实例则为False)。

Django的get_or_create通常用于确保全局数据可用,所以我在尽可能早的时候提交。

取决于您采用的隔离级别,以上解决方案都不能工作。 我发现的最好的解决方案是一个RAW SQL在以下形式:

INSERT INTO table(f1, f2, unique_f3)
SELECT 'v1', 'v2', 'v3'
WHERE NOT EXISTS (SELECT 1 FROM table WHERE f3 = 'v3')

无论隔离级别和并行度如何,这都是事务安全的。

注意:为了提高效率,明智的做法是为唯一的列使用INDEX。

erik的优秀回答的修改版本

def get_one_or_create(session,
model,
create_method='',
create_method_kwargs=None,
**kwargs):
try:
return session.query(model).filter_by(**kwargs).one(), True
except NoResultFound:
kwargs.update(create_method_kwargs or {})
try:
with session.begin_nested():
created = getattr(model, create_method, model)(**kwargs)
session.add(created)
return created, False
except IntegrityError:
return session.query(model).filter_by(**kwargs).one(), True
  • 使用嵌套事务只回滚添加的新项,而不是回滚所有内容(参见此回答使用SQLite嵌套事务)
  • create_method移动。如果创建的对象具有关系,并且通过这些关系为其分配成员,则会自动将其添加到会话中。例如,创建一个book,其中有user_iduser作为对应关系,然后在create_method中执行book.user=<user object>将会将book添加到会话中。这意味着create_method必须在with内部才能从最终的回滚中受益。注意begin_nested自动触发刷新。

注意,如果使用MySQL,事务隔离级别必须设置为READ COMMITTED而不是REPEATABLE READ才能正常工作。Django的get_or_create(和在这里)使用相同的策略,参见Django 文档

我稍微简化了一下@凯文。避免将整个函数包装在if/else语句中的解决方案。这样就只有一个return,我发现它更干净:

def get_or_create(session, model, **kwargs):
instance = session.query(model).filter_by(**kwargs).first()


if not instance:
instance = model(**kwargs)
session.add(instance)


return instance

有一个Python包包含@erik的解决方案以及update_or_create()的一个版本。https://github.com/enricobarzetti/sqlalchemy_get_or_create

我经常遇到的一个问题是,当一个字段有一个最大长度(比如,STRING(40)),而你想对一个大长度的字符串执行get or create,上述解决方案将失败。

基于上述解决方案,以下是我的方法:

from sqlalchemy import Column, String


def get_or_create(self, add=True, flush=True, commit=False, **kwargs):
"""


Get the an entity based on the kwargs or create an entity with those kwargs.


Params:
add: (default True) should the instance be added to the session?
flush: (default True) flush the instance to the session?
commit: (default False) commit the session?
kwargs: key, value pairs of parameters to lookup/create.


Ex: SocialPlatform.get_or_create(**{'name':'facebook'})
returns --> existing record or, will create a new record


---------


NOTE: I like to add this as a classmethod in the base class of my tables, so that
all data models inherit the base class --> functionality is transmitted across
all orm defined models.


"""




# Truncate values if necessary
for key, value in kwargs.items():


# Only use strings
if not isinstance(value, str):
continue


# Only use if it's a column
my_col = getattr(self.__table__.columns, key)


if not isinstance(my_col, Column):
continue


# Skip non strings again here
if not isinstance(my_col.type, String):
continue


# Get the max length
max_len = my_col.type.length


if value and max_len and len(value) > max_len:


# Update the value
value = value[:max_len]
kwargs[key] = value


# -------------------------------------------------


# Make the query...
instance = session.query(self).filter_by(**kwargs).first()


if instance:
return instance


else:
# Max length isn't accounted for here.
# The assumption is that auto-truncation will happen on the child-model
# Or directtly in the db
instance = self(**kwargs)


# You'll usually want to add to the session
if add:
session.add(instance)


# Navigate these with caution
if add and commit:
try:
session.commit()
except IntegrityError:
session.rollback()


elif add and flush:
session.flush()




return instance