如何将SQLAlchemy行对象转换为Python字典?

是否有一种简单的方法来遍历列名和值对?

我的SQLAlchemy版本是0.5.6

下面是我尝试使用dict(row)的示例代码:

import sqlalchemy
from sqlalchemy import *
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker


print "sqlalchemy version:",sqlalchemy.__version__


engine = create_engine('sqlite:///:memory:', echo=False)
metadata = MetaData()
users_table = Table('users', metadata,
Column('id', Integer, primary_key=True),
Column('name', String),
)
metadata.create_all(engine)


class User(declarative_base()):
__tablename__ = 'users'
    

id = Column(Integer, primary_key=True)
name = Column(String)
    

def __init__(self, name):
self.name = name


Session = sessionmaker(bind=engine)
session = Session()


user1 = User("anurag")
session.add(user1)
session.commit()


# uncommenting next line throws exception 'TypeError: 'User' object is not iterable'
#print dict(user1)
# this one also throws 'TypeError: 'User' object is not iterable'
for u in session.query(User).all():
print dict(u)

在我的系统输出上运行这段代码:

Traceback (most recent call last):
File "untitled-1.py", line 37, in <module>
print dict(u)
TypeError: 'User' object is not iterable
404924 次浏览

@zzzeek在评论中写道:

注意,这是现代版本的正确答案 SQLAlchemy,假设"row"是一个核心行对象,而不是orm映射 实例。< / p >
for row in resultproxy:
row_as_dict = row._mapping  # SQLAlchemy 1.4 and greater
# row_as_dict = dict(row)  # SQLAlchemy 1.3 and earlier

row._mapping的背景,SQLAlchemy 1.4新增:https://docs.sqlalchemy.org/en/stable/core/connections.html#sqlalchemy.engine.Row._mapping

我不能得到一个好的答案,所以我用这个:

def row2dict(row):
d = {}
for column in row.__table__.columns:
d[column.name] = str(getattr(row, column.name))


return d

编辑:如果上面的函数太长,不适合某些口味,这里是一个一行(python 2.7+)

row2dict = lambda r: {c.name: str(getattr(r, c.name)) for c in r.__table__.columns}

你正在迭代的表达式的计算结果是模型对象的列表,而不是行。下面是正确的用法:

for u in session.query(User).all():
print u.id, u.name

你真的需要把它们转换成字典吗?当然,有很多方法,但是你不需要SQLAlchemy的ORM部分:

result = session.execute(User.__table__.select())
for row in result:
print dict(row)

更新:看看sqlalchemy.orm.attributes模块。它有一组处理对象状态的函数,这可能对你有用,尤其是instance_dict()

class User(object):
def to_dict(self):
return dict([(k, getattr(self, k)) for k in self.__dict__.keys() if not k.startswith("_")])

这应该有用。

from sqlalchemy.orm import class_mapper


def asdict(obj):
return dict((col.name, getattr(obj, col.name))
for col in class_mapper(obj.__class__).mapped_table.c)

我对马可·马里亚尼(Marco Mariani)的回答有一个变体,以装饰者的身份表达。主要的区别是它将处理实体的列表,以及安全地忽略一些其他类型的返回值(这在使用模拟编写测试时非常有用):

@decorator
def to_dict(f, *args, **kwargs):
result = f(*args, **kwargs)
if is_iterable(result) and not is_dict(result):
return map(asdict, result)


return asdict(result)


def asdict(obj):
return dict((col.name, getattr(obj, col.name))
for col in class_mapper(obj.__class__).mapped_table.c)


def is_dict(obj):
return isinstance(obj, dict)


def is_iterable(obj):
return True if getattr(obj, '__iter__', False) else False

Elixir是这样做的。这个解决方案的价值在于,它允许递归地包括关系的字典表示。

def to_dict(self, deep={}, exclude=[]):
"""Generate a JSON-style nested dict/list structure from an object."""
col_prop_names = [p.key for p in self.mapper.iterate_properties \
if isinstance(p, ColumnProperty)]
data = dict([(name, getattr(self, name))
for name in col_prop_names if name not in exclude])
for rname, rdeep in deep.iteritems():
dbdata = getattr(self, rname)
#FIXME: use attribute names (ie coltoprop) instead of column names
fks = self.mapper.get_property(rname).remote_side
exclude = [c.name for c in fks]
if dbdata is None:
data[rname] = None
elif isinstance(dbdata, list):
data[rname] = [o.to_dict(rdeep, exclude) for o in dbdata]
else:
data[rname] = dbdata.to_dict(rdeep, exclude)
return data

你可以访问SQLAlchemy对象的内部__dict__,如下所示:

for u in session.query(User).all():
print u.__dict__

我找到这篇文章是因为我正在寻找一种将SQLAlchemy行转换为dict的方法。我正在使用SqlSoup…但答案是我自己想出来的,所以,如果它能帮助到别人,我的意见是:

a = db.execute('select * from acquisizioni_motes')
b = a.fetchall()
c = b[0]


# and now, finally...
dict(zip(c.keys(), c.values()))

这里有一个超级简单的方法

row2dict = lambda r: dict(r.items())

行有一个_asdict()函数,它给出一个字典

In [8]: r1 = db.session.query(Topic.name).first()


In [9]: r1
Out[9]: (u'blah')


In [10]: r1.name
Out[10]: u'blah'


In [11]: r1._asdict()
Out[11]: {'name': u'blah'}

正如@balki提到的:

如果你正在查询一个特定的字段,可以使用_asdict()方法,因为它作为KeyedTuple返回。

In [1]: foo = db.session.query(Topic.name).first()
In [2]: foo._asdict()
Out[2]: {'name': u'blah'}

然而,如果您没有指定列,则可以使用其他建议的方法之一——例如@charlax提供的方法。注意,此方法仅对2.7+有效。

In [1]: foo = db.session.query(Topic).first()
In [2]: {x.name: getattr(foo, x.name) for x in foo.__table__.columns}
Out[2]: {'name': u'blah'}

在@balki回答之后,从SQLAlchemy 0.8开始你可以使用_asdict(),可用于KeyedTuple对象。这为最初的问题提供了一个非常直接的答案。只是,在你的例子中改变最后两行(for循环):

for u in session.query(User).all():
print u._asdict()

这是因为在上面的代码中,u是一个类型为KeyedTuple的对象,因为.all()返回一个KeyedTuple的列表。因此,它有方法_asdict(),它很好地返回u作为一个字典。

WRT @STB: AFAIK的答案,任何.all()返回的是一个KeypedTuple的列表。因此,无论是否指定列,只要将.all()的结果应用于Query对象,上述方法都有效。

假设将下列函数添加到class User中,则以下函数将返回所有列的所有键-值对:

def columns_to_dict(self):
dict_ = {}
for key in self.__mapper__.c.keys():
dict_[key] = getattr(self, key)
return dict_

与其他答案不同的是,只有对象的那些属性被返回,它们是对象类级别的Column属性。因此,不包括_sa_instance_state或任何其他属性SQLalchemy或你添加到对象。参考

编辑:忘记说,这也适用于继承的列。

hybrid_property延伸

如果你还想包含hybrid_property属性,以下方法可以工作:

from sqlalchemy import inspect
from sqlalchemy.ext.hybrid import hybrid_property


def publics_to_dict(self) -> {}:
dict_ = {}
for key in self.__mapper__.c.keys():
if not key.startswith('_'):
dict_[key] = getattr(self, key)


for key, prop in inspect(self.__class__).all_orm_descriptors.items():
if isinstance(prop, hybrid_property):
dict_[key] = getattr(self, key)
return dict_

我假设你在这里用_开头标记Columns,表示你想隐藏它们,或者是因为你通过hybrid_property访问属性,或者你只是不想显示它们。参考

Tipp all_orm_descriptors也返回hybrid_methodAssociationProxy,如果你也想包含它们的话。

其他答案备注

基于__dict__属性的每个答案(如12)只是返回对象的所有属性。这可以是你想要的更多的属性。就像我说的,这包括_sa_instance_state或你在这个对象上定义的任何其他属性。

每个基于dict()函数的答案(如12)只适用于session.execute()返回的SQLalchemy行对象,而不适用于你定义的类,如问题中的class User

基于row.__table__.columns解决的答案肯定会工作。row.__table__.columns包含SQL数据库的列名。这些只能等于python对象的属性名。如果不是,则得到AttributeError。 对于基于class_mapper(obj.__class__).mapped_table.c的答案(如12)也是一样的

在大多数情况下,列名适合它们。但是你可能会像下面这样写代码:

class UserModel(BaseModel):
user_id = Column("user_id", INT, primary_key=True)
email = Column("user_email", STRING)

column.name“user_email”而字段名是“email”,column.name不能像以前那样工作。

sqlalchemy_base_model.py

我也把答案写在这里

我是一个新晋的Python程序员,遇到了使用join表获取JSON的问题。使用这里的答案中的信息,我构建了一个函数,将合理的结果返回到JSON,其中包括表名,避免使用别名或字段冲突。

简单地传递会话查询的结果:

test = Session()。查询(VMInfo、客户). join(客户).order_by (VMInfo.vm_name) .limit (50) .offset (10)

sqlAl2json(test)

def sqlAl2json(self, result):
arr = []
for rs in result.all():
proc = []
try:
iterator = iter(rs)
except TypeError:
proc.append(rs)
else:
for t in rs:
proc.append(t)


dict = {}
for p in proc:
tname = type(p).__name__
for d in dir(p):
if d.startswith('_') | d.startswith('metadata'):
pass
else:
key = '%s_%s' %(tname, d)
dict[key] = getattr(p, d)
arr.append(dict)
return json.dumps(arr)

我在这方面没有太多经验,但下面的方法似乎对我来说很有用:

dict(row)

这似乎太简单了(与这里的其他答案相比)。我错过了什么?

在SQLAlchemy v0.8及更新版本中,使用检测系统

from sqlalchemy import inspect


def object_as_dict(obj):
return {c.key: getattr(obj, c.key)
for c in inspect(obj).mapper.column_attrs}


user = session.query(User).first()


d = object_as_dict(user)

注意,.key是属性名,可以与列名不同,例如在以下情况下:

class_ = Column('class', Text)

此方法也适用于column_property

一个适用于继承类的解决方案:

from itertools import chain
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()




class Mixin(object):
def as_dict(self):
tables = [base.__table__ for base in self.__class__.__bases__ if base not in [Base, Mixin]]
tables.append(self.__table__)
return {c.name: getattr(self, c.name) for c in chain.from_iterable([x.columns for x in tables])}

我对使用(太多?)字典的看法:

def serialize(_query):
#d = dictionary written to per row
#D = dictionary d is written to each time, then reset
#Master = dictionary of dictionaries; the id Key (int, unique from database) from D is used as the Key for the dictionary D entry in Master
Master = {}
D = {}
x = 0
for u in _query:
d = u.__dict__
D = {}
for n in d.keys():
if n != '_sa_instance_state':
D[n] = d[n]
x = d['id']
Master[x] = D
return Master

使用flask(包括jsonify)和flask_sqlalchemy将输出打印为JSON。

使用jsonify(serialize())调用该函数。

与我迄今为止尝试过的所有SQLAlchemy查询一起工作(运行SQLite3)

老问题,但由于这是谷歌中“sqlalchemy row to dict”的第一个结果,它值得一个更好的答案。

SqlAlchemy返回的RowProxy对象具有items()方法: http://docs.sqlalchemy.org/en/latest/core/connections.html#sqlalchemy.engine.RowProxy.items < / p >

它只是返回一个(key, value)元组列表。因此可以使用以下方法将行转换为dict:

在Python <= 2.6中:

rows = conn.execute(query)
list_of_dicts = [dict((key, value) for key, value in row.items()) for row in rows]

在Python中>= 2.7:

rows = conn.execute(query)
list_of_dicts = [{key: value for (key, value) in row.items()} for row in rows]

参考Alex Brasetvik的回答,你可以用一行代码来解决这个问题

row_as_dict = [dict(row) for row in resultproxy]

在Alex Brasetvik的回答的评论部分,SQLAlchemy的创建者zzzeek表示这是解决这个问题的“正确方法”。

有了这段代码,您还可以添加到您的查询“过滤器”或“连接”,这工作!

query = session.query(User)
def query_to_dict(query):
def _create_dict(r):
return {c.get('name'): getattr(r, c.get('name')) for c in query.column_descriptions}


return [_create_dict(r) for r in query]

两种方式:

1.

for row in session.execute(session.query(User).statement):
print(dict(row))

2.

selected_columns = User.__table__.columns
rows = session.query(User).with_entities(*selected_columns).all()
for row in rows :
print(row._asdict())

您可以像这样将sqlalchemy对象转换为字典,并将其作为json/dictionary返回。

辅助功能:

import json
from collections import OrderedDict




def asdict(self):
result = OrderedDict()
for key in self.__mapper__.c.keys():
if getattr(self, key) is not None:
result[key] = str(getattr(self, key))
else:
result[key] = getattr(self, key)
return result




def to_array(all_vendors):
v = [ ven.asdict() for ven in all_vendors ]
return json.dumps(v)

驱动程序功能:

def all_products():
all_products = Products.query.all()
return to_array(all_products)

如果你的模型表列不需要mysql列。

例如:

class People:
id: int = Column(name='id', type_=Integer, primary_key=True)
createdTime: datetime = Column(name='create_time', type_=TIMESTAMP,
nullable=False,
server_default=text("CURRENT_TIMESTAMP"),
default=func.now())
modifiedTime: datetime = Column(name='modify_time', type_=TIMESTAMP,
server_default=text("CURRENT_TIMESTAMP"),
default=func.now())

需要使用:

 from sqlalchemy.orm import class_mapper
def asDict(self):
return {x.key: getattr(self, x.key, None) for x in
class_mapper(Application).iterate_properties}

如果你使用这种方式,你可以得到modify_time和create_time都是None

{'id': 1, 'create_time': None, 'modify_time': None}




def to_dict(self):
return {c.name: getattr(self, c.name, None)
for c in self.__table__.columns}

因为类属性名称不等于列存储在mysql

一个非常简单的解决方案:row._asdict()

> data = session.query(Table).all()
> [row._asdict() for row in data]

返回this:class:.KeyedTuple的内容作为字典

In [46]: result = aggregate_events[0]


In [47]: type(result)
Out[47]: sqlalchemy.util._collections.result


In [48]: def to_dict(query_result=None):
...:     cover_dict = {key: getattr(query_result, key) for key in query_result.keys()}
...:     return cover_dict
...:
...:


In [49]: to_dict(result)
Out[49]:
{'calculate_avg': None,
'calculate_max': None,
'calculate_min': None,
'calculate_sum': None,
'dataPointIntID': 6,
'data_avg': 10.0,
'data_max': 10.0,
'data_min': 10.0,
'data_sum': 60.0,
'deviceID': u'asas',
'productID': u'U7qUDa',
'tenantID': u'CvdQcYzUM'}

为了大家和我自己,以下是我如何使用它:

def run_sql(conn_String):
output_connection = engine.create_engine(conn_string, poolclass=NullPool).connect()
rows = output_connection.execute('select * from db1.t1').fetchall()
return [dict(row) for row in rows]

你可以试着这样做。

for u in session.query(User).all():
print(u._asdict())

它使用查询对象中的内置方法返回查询对象的字典对象。

引用:https://docs.sqlalchemy.org/en/latest/orm/query.html

def to_dict(row):
return {column.name: getattr(row, row.__mapper__.get_property_by_column(column).key) for column in row.__table__.columns}




for u in session.query(User).all():
print(to_dict(u))

这个函数可能有帮助。 当属性名与列名不同时,我找不到更好的解决方案来解决问题

为了完成@Anurag Uniyal的回答,这里有一个递归地遵循关系的方法:

from sqlalchemy.inspection import inspect


def to_dict(obj, with_relationships=True):
d = {}
for column in obj.__table__.columns:
if with_relationships and len(column.foreign_keys) > 0:
# Skip foreign keys
continue
d[column.name] = getattr(obj, column.name)


if with_relationships:
for relationship in inspect(type(obj)).relationships:
val = getattr(obj, relationship.key)
d[relationship.key] = to_dict(val) if val else None
return d


class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
first_name = Column(TEXT)
address_id = Column(Integer, ForeignKey('addresses.id')
address = relationship('Address')


class Address(Base):
__tablename__ = 'addresses'
id = Column(Integer, primary_key=True)
city = Column(TEXT)




user = User(first_name='Nathan', address=Address(city='Lyon'))
# Add and commit user to session to create ids


to_dict(user)
# {'id': 1, 'first_name': 'Nathan', 'address': {'city': 'Lyon'}}
to_dict(user, with_relationship=False)
# {'id': 1, 'first_name': 'Nathan', 'address_id': 1}

你在你的项目中到处都需要它,我很欣赏@anurag的回答,它很好。直到这一点上,我正在使用它,但它会混乱你所有的代码,也不会与实体改变工作。

宁可试试这个, 在SQLAlchemy

中继承基本查询类
from flask_sqlalchemy import SQLAlchemy, BaseQuery




class Query(BaseQuery):
def as_dict(self):
context = self._compile_context()
context.statement.use_labels = False
columns = [column.name for column in context.statement.columns]


return list(map(lambda row: dict(zip(columns, row)), self.all()))




db = SQLAlchemy(query_class=Query)

在那之后,无论你在哪里定义你的对象“as_dict”方法都会在那里。

Python 3.6.8 +

内置str()方法自动转换datetime。Datetime对象到iso-8806-1。

print(json.dumps([dict(row.items()) for row in rows], default=str, indent="  "))

注意:default函数只会应用于有错误的值,所以intfloat值不会被转换…除非出现错误:)。

在python 3.8+中,我们可以使用数据类以及它附带的asdict方法来完成此操作:

from dataclasses import dataclass, asdict


from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy import Column, String, Integer, create_engine


Base = declarative_base()
engine = create_engine('sqlite:///:memory:', echo=False)




@dataclass
class User(Base):
__tablename__ = 'users'


id: int = Column(Integer, primary_key=True)
name: str = Column(String)
email = Column(String)


def __init__(self, name):
self.name = name
self.email = 'hello@example.com'




Base.metadata.create_all(engine)


SessionMaker = sessionmaker(bind=engine)
session = SessionMaker()


user1 = User("anurag")
session.add(user1)
session.commit()


query_result = session.query(User).one()  # type: User
print(f'{query_result.id=:}, {query_result.name=:}, {query_result.email=:}')
# query_result.id=1, query_result.name=anurag, query_result.email=hello@example.com


query_result_dict = asdict(query_result)
print(query_result_dict)
# {'id': 1, 'name': 'anurag'}

关键是使用@dataclass装饰器,并用它的类型注释每一列(name: str = Column(String)行的: str部分)。

还要注意,由于email没有注释,所以它不包含在query_result_dict中。

我们可以在dict中得到一个对象列表:

def queryset_to_dict(query_result):
query_columns = query_result[0].keys()
res = [list(ele) for ele in query_result]
dict_list = [dict(zip(query_columns, l)) for l in res]
return dict_list


query_result = db.session.query(LanguageMaster).all()
dictvalue=queryset_to_dict(query_result)
我处理这个问题才几分钟。 标记为正确的答案不尊重字段的类型。 解决方案来自于dictalchemy,添加了一些有趣的功能。 https://pythonhosted.org/dictalchemy/ 我刚刚测试过,工作正常
Base = declarative_base(cls=DictableModel)


session.query(User).asdict()
{'id': 1, 'username': 'Gerald'}


session.query(User).asdict(exclude=['id'])
{'username': 'Gerald'}

使用字典推导式

for u in session.query(User).all():
print ({column.name: str(getattr(row, column.name)) for column in row.__table__.columns})
from copy import copy


def to_record(row):
record = copy(row.__dict__)
del record["_sa_instance_state"]
return record

如果不使用复制,可能会遇到错误。

Anurag Uniyal版本的改进版本,考虑了类型:

def sa_vars(row):
return {
column.name: column.type.python_type(getattr(row, column.name))
for column in row.__table__.columns
}

使用sqlalchemy 1.4

session.execute(select(User.id, User.username)).mappings().all()
>> [{'id': 1, 'username': 'Bob'}, {'id': 2, 'username': 'Alice'}]

如OP所述,调用dict初始化式会引发一个异常,消息为“"User"对象不可迭代。所以真正的问题是如何使一个SQLAlchemy模型可迭代?

我们将不得不实现特殊的方法__iter____next__,但如果我们直接继承declarative_base模型,我们仍然会遇到不希望的“_sa_instance_state”;关键。更糟糕的是,每次调用__next__时,我们都必须遍历__dict__.keys(),因为keys()方法返回一个View——一个没有索引的可迭代对象。这将使时间复杂度增加N倍,其中N是__dict__中的键数。生成字典将花费O(N²)。我们可以做得更好。

我们可以实现自己的基类,它实现所需的特殊方法,并存储可以通过索引访问的列名列表,从而降低生成O(N)字典的时间复杂性。这有一个额外的好处,我们可以定义一次逻辑,并在任何时候从基类继承,我们希望我们的模型类是可迭代的。

class IterableBase(declarative_base()):
__abstract__ = True


def _init_keys(self):
self._keys = [c.name for c in self.__table__.columns]
self._dict = {c.name: getattr(self, c.name) for c in self.__table__.columns}


def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._init_keys()


def __setattr__(self, name, value):
super().__setattr__(name, value)
if name not in ('_dict', '_keys', '_n') and '_dict' in self.__dict__:
self._dict[name] = value


def __iter__(self):
self._n = 0
return self


def __next__(self):
if self._n >= len(self._keys):
raise StopIteration
self._n += 1
key = self._keys[self._n-1]
return (key, self._dict[key])

现在User类可以直接从IterableBase类继承。

class User(IterableBase):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String)

您可以确认,以User实例作为参数调用dict函数将返回所需的字典,不带“_sa_instance_state”。你可能已经注意到在IterableBase类中声明的__setattr__方法。这确保在初始化后属性发生变化或设置时更新_dict。

def main():
user1 = User('Bob')
print(dict(user1))
# outputs {'id': None, 'name': 'Bob'}
user1.id = 42
print(dict(user1))
# outputs {'id': 42, 'name': 'Bob'}


if __name__ == '__main__':
main()

使用以下SQLAlchemy代码查询数据库后:

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker




SQLALCHEMY_DATABASE_URL = 'sqlite:///./examples/sql_app.db'
engine = create_engine(SQLALCHEMY_DATABASE_URL, echo=True)
query = sqlalchemy.select(TABLE)
result = engine.execute(query).fetchall()

你可以使用下面的一行代码:

query_dict = [record._mapping for record in results]

sqlalchemy-utilsget_columns来帮助这一点。

你可以这样写:

{column: getattr(row, column) for column in get_columns(row)}