如何序列化SqlAlchemy结果JSON?

Django可以很好地自动序列化从DB返回到JSON格式的ORM模型。

如何序列化SQLAlchemy查询结果为JSON格式?

我尝试了jsonpickle.encode,但它编码查询对象本身。 我尝试了json.dumps(items),但它返回

TypeError: <Product('3', 'some name', 'some desc')> is not JSON serializable

将SQLAlchemy ORM对象序列化为JSON /XML真的那么难吗?它没有任何默认序列化器吗?现在序列化ORM查询结果是非常常见的任务。

我所需要的只是返回SQLAlchemy查询结果的JSON或XML数据表示。

javascript datagird需要使用JSON/XML格式的SQLAlchemy对象查询结果(JQGrid http://www.trirand.com/blog/)

293761 次浏览

这并不是那么简单。我写了一些代码来做这件事。我还在开发中,它使用了MochiKit框架。它基本上使用代理和注册的JSON转换器在Python和Javascript之间转换复合对象。

数据库对象的浏览器端是db.js 它需要proxy.js中的基本Python代理源 在Python端有基代理模块。 最后是webserver.py中的SqlAlchemy对象编码器。 它还取决于在models.py文件中找到的元数据提取器

你可以像这样将RowProxy转换为dict:

 d = dict(row.items())
然后将其序列化为JSON(你必须为诸如datetime值之类的东西指定一个编码器) 如果你只想要一条记录(而不是相关记录的完整层次结构),这并不难
json.dumps([(dict(row.items())) for row in rs])

扁平化实现

你可以使用这样的代码:

from sqlalchemy.ext.declarative import DeclarativeMeta


class AlchemyEncoder(json.JSONEncoder):


def default(self, obj):
if isinstance(obj.__class__, DeclarativeMeta):
# an SQLAlchemy class
fields = {}
for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata']:
data = obj.__getattribute__(field)
try:
json.dumps(data) # this will fail on non-encodable values, like other classes
fields[field] = data
except TypeError:
fields[field] = None
# a json-encodable dict
return fields


return json.JSONEncoder.default(self, obj)

然后转换为JSON使用:

c = YourAlchemyClass()
print json.dumps(c, cls=AlchemyEncoder)

它将忽略不可编码的字段(将它们设置为“None”)。

它不会自动展开关系(因为这可能导致自引用,并永远循环)。

递归的非循环实现

然而,如果你宁愿永远循环,你可以使用:

from sqlalchemy.ext.declarative import DeclarativeMeta


def new_alchemy_encoder():
_visited_objs = []


class AlchemyEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj.__class__, DeclarativeMeta):
# don't re-visit self
if obj in _visited_objs:
return None
_visited_objs.append(obj)


# an SQLAlchemy class
fields = {}
for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata']:
fields[field] = obj.__getattribute__(field)
# a json-encodable dict
return fields


return json.JSONEncoder.default(self, obj)


return AlchemyEncoder

然后对对象进行编码,使用:

print json.dumps(e, cls=new_alchemy_encoder(), check_circular=False)

这将编码所有的子代、子代、子代……基本上可以编码你的整个数据库。当它到达之前编码过的东西时,它会将其编码为“None”。

递归的、可能是循环的、有选择的实现

另一种选择,可能更好,是能够指定你想要展开的字段:

def new_alchemy_encoder(revisit_self = False, fields_to_expand = []):
_visited_objs = []


class AlchemyEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj.__class__, DeclarativeMeta):
# don't re-visit self
if revisit_self:
if obj in _visited_objs:
return None
_visited_objs.append(obj)


# go through each field in this SQLalchemy class
fields = {}
for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata']:
val = obj.__getattribute__(field)


# is this field another SQLalchemy object, or a list of SQLalchemy objects?
if isinstance(val.__class__, DeclarativeMeta) or (isinstance(val, list) and len(val) > 0 and isinstance(val[0].__class__, DeclarativeMeta)):
# unless we're expanding this field, stop here
if field not in fields_to_expand:
# not expanding this field: set it to None and continue
fields[field] = None
continue


fields[field] = val
# a json-encodable dict
return fields


return json.JSONEncoder.default(self, obj)


return AlchemyEncoder

你现在可以调用它:

print json.dumps(e, cls=new_alchemy_encoder(False, ['parents']), check_circular=False)

例如,仅展开名为“parents”的SQLAlchemy字段。

你可以把你的对象输出为一个字典:

class User:
def as_dict(self):
return {c.name: getattr(self, c.name) for c in self.__table__.columns}

然后使用User.as_dict()序列化对象。

将sqlalchemy行对象转换为python dict中所解释

我建议使用棉花糖。它允许您创建序列化器来表示支持关系和嵌套对象的模型实例。

以下是他们文档中的一个删节的例子。以ORM模型Author为例:

class Author(db.Model):
id = db.Column(db.Integer, primary_key=True)
first = db.Column(db.String(80))
last = db.Column(db.String(80))

该类的棉花糖模式是这样构造的:

class AuthorSchema(Schema):
id = fields.Int(dump_only=True)
first = fields.Str()
last = fields.Str()
formatted_name = fields.Method("format_name", dump_only=True)


def format_name(self, author):
return "{}, {}".format(author.last, author.first)

...并像这样使用:

author_schema = AuthorSchema()
author_schema.dump(Author.query.first())

...会产生这样的输出:

{
"first": "Tim",
"formatted_name": "Peters, Tim",
"id": 1,
"last": "Peters"
}

看看他们完整的Flask-SQLAlchemy例子

一个名为marshmallow-sqlalchemy的库专门集成了SQLAlchemy和marshmallow。在这个库中,上面描述的Author模型的模式看起来像这样:

class AuthorSchema(ModelSchema):
class Meta:
model = Author

该集成允许从SQLAlchemy Column类型推断字段类型。

< a href = " https://marshmallow-sqlalchemy.readthedocs。Io /en/latest/" rel="noreferrer">marshmallow-sqlalchemy here. . sqlalchemy

Flask-JsonTools包有你的模型的JsonSerializableBase基类的实现。

用法:

from sqlalchemy.ext.declarative import declarative_base
from flask.ext.jsontools import JsonSerializableBase


Base = declarative_base(cls=(JsonSerializableBase,))


class User(Base):
#...

现在User模型可以神奇地序列化了。

如果你的框架不是Flask,你可以抓住代码

出于安全考虑,您不应该返回模型的所有字段。我更喜欢有选择性地选择他们。

Flask的json编码现在支持UUID、datetime和relationships(并为flask_sqlalchemy db.Model类添加了queryquery_class)。编码器我更新如下:

app / json_encoder.py

    from sqlalchemy.ext.declarative import DeclarativeMeta
from flask import json




class AlchemyEncoder(json.JSONEncoder):
def default(self, o):
if isinstance(o.__class__, DeclarativeMeta):
data = {}
fields = o.__json__() if hasattr(o, '__json__') else dir(o)
for field in [f for f in fields if not f.startswith('_') and f not in ['metadata', 'query', 'query_class']]:
value = o.__getattribute__(field)
try:
json.dumps(value)
data[field] = value
except TypeError:
data[field] = None
return data
return json.JSONEncoder.default(self, o)

app/__init__.py

# json encoding
from app.json_encoder import AlchemyEncoder
app.json_encoder = AlchemyEncoder

有了这个,我可以选择添加一个__json__属性,返回我希望编码的字段列表:

app/models.py

class Queue(db.Model):
id = db.Column(db.Integer, primary_key=True)
song_id = db.Column(db.Integer, db.ForeignKey('song.id'), unique=True, nullable=False)
song = db.relationship('Song', lazy='joined')
type = db.Column(db.String(20), server_default=u'audio/mpeg')
src = db.Column(db.String(255), nullable=False)
created_at = db.Column(db.DateTime, server_default=db.func.now())
updated_at = db.Column(db.DateTime, server_default=db.func.now(), onupdate=db.func.now())


def __init__(self, song):
self.song = song
self.src = song.full_path


def __json__(self):
return ['song', 'src', 'type', 'created_at']

我添加@jsonapi到我的视图,返回结果列表,然后我的输出如下:

[


{


"created_at": "Thu, 23 Jul 2015 11:36:53 GMT",
"song":


{
"full_path": "/static/music/Audioslave/Audioslave [2002]/1 Cochise.mp3",
"id": 2,
"path_name": "Audioslave/Audioslave [2002]/1 Cochise.mp3"
},
"src": "/static/music/Audioslave/Audioslave [2002]/1 Cochise.mp3",
"type": "audio/mpeg"
}


]

自定义序列化和反序列化。

“from_json”(类方法)基于json数据构建Model对象。

“序列化”只能在实例上调用,并将json中的所有数据合并到Model实例中。

“序列化” -递归序列化

__write_only__属性用于定义只写属性(例如"password_hash")。

class Serializable(object):
__exclude__ = ('id',)
__include__ = ()
__write_only__ = ()


@classmethod
def from_json(cls, json, selfObj=None):
if selfObj is None:
self = cls()
else:
self = selfObj
exclude = (cls.__exclude__ or ()) + Serializable.__exclude__
include = cls.__include__ or ()
if json:
for prop, value in json.iteritems():
# ignore all non user data, e.g. only
if (not (prop in exclude) | (prop in include)) and isinstance(
getattr(cls, prop, None), QueryableAttribute):
setattr(self, prop, value)
return self


def deserialize(self, json):
if not json:
return None
return self.__class__.from_json(json, selfObj=self)


@classmethod
def serialize_list(cls, object_list=[]):
output = []
for li in object_list:
if isinstance(li, Serializable):
output.append(li.serialize())
else:
output.append(li)
return output


def serialize(self, **kwargs):


# init write only props
if len(getattr(self.__class__, '__write_only__', ())) == 0:
self.__class__.__write_only__ = ()
dictionary = {}
expand = kwargs.get('expand', ()) or ()
prop = 'props'
if expand:
# expand all the fields
for key in expand:
getattr(self, key)
iterable = self.__dict__.items()
is_custom_property_set = False
# include only properties passed as parameter
if (prop in kwargs) and (kwargs.get(prop, None) is not None):
is_custom_property_set = True
iterable = kwargs.get(prop, None)
# loop trough all accessible properties
for key in iterable:
accessor = key
if isinstance(key, tuple):
accessor = key[0]
if not (accessor in self.__class__.__write_only__) and not accessor.startswith('_'):
# force select from db to be able get relationships
if is_custom_property_set:
getattr(self, accessor, None)
if isinstance(self.__dict__.get(accessor), list):
dictionary[accessor] = self.__class__.serialize_list(object_list=self.__dict__.get(accessor))
# check if those properties are read only
elif isinstance(self.__dict__.get(accessor), Serializable):
dictionary[accessor] = self.__dict__.get(accessor).serialize()
else:
dictionary[accessor] = self.__dict__.get(accessor)
return dictionary
def alc2json(row):
return dict([(col, str(getattr(row,col))) for col in row.__table__.columns.keys()])

我想和她玩会儿代码高尔夫。

供您参考:我使用automap_base,因为我们有一个根据业务需求单独设计的模式。我今天才开始使用SQLAlchemy,但是文档指出automap_base是declarative_base的扩展,这似乎是SQLAlchemy ORM中的典型范例,所以我相信这应该可以工作。

它并没有按照Tjorriemorrie的解决方案跟随外键,而是简单地将列与值匹配,并通过str()-ing列值来处理Python类型。我们的值包括Python datetime。时间和小数。十进位类类型的结果,所以它完成了工作。

希望对路人有所帮助!

我知道这是一个相当老的帖子。我采取了@SashaB给出的解决方案,并根据我的需要进行了修改。

我添加了以下内容:

  1. 字段忽略列表:序列化时要忽略的字段列表
  2. 字段替换列表:包含在序列化时要被值替换的字段名的字典。
  3. 删除方法和BaseQuery被序列化

我的代码如下:

def alchemy_json_encoder(revisit_self = False, fields_to_expand = [], fields_to_ignore = [], fields_to_replace = {}):
"""
Serialize SQLAlchemy result into JSon
:param revisit_self: True / False
:param fields_to_expand: Fields which are to be expanded for including their children and all
:param fields_to_ignore: Fields to be ignored while encoding
:param fields_to_replace: Field keys to be replaced by values assigned in dictionary
:return: Json serialized SQLAlchemy object
"""
_visited_objs = []
class AlchemyEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj.__class__, DeclarativeMeta):
# don't re-visit self
if revisit_self:
if obj in _visited_objs:
return None
_visited_objs.append(obj)


# go through each field in this SQLalchemy class
fields = {}
for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata' and x not in fields_to_ignore]:
val = obj.__getattribute__(field)
# is this field method defination, or an SQLalchemy object
if not hasattr(val, "__call__") and not isinstance(val, BaseQuery):
field_name = fields_to_replace[field] if field in fields_to_replace else field
# is this field another SQLalchemy object, or a list of SQLalchemy objects?
if isinstance(val.__class__, DeclarativeMeta) or \
(isinstance(val, list) and len(val) > 0 and isinstance(val[0].__class__, DeclarativeMeta)):
# unless we're expanding this field, stop here
if field not in fields_to_expand:
# not expanding this field: set it to None and continue
fields[field_name] = None
continue


fields[field_name] = val
# a json-encodable dict
return fields


return json.JSONEncoder.default(self, obj)
return AlchemyEncoder

希望它能帮助到一些人!

我对使用(太多?)字典的看法:

def serialize(_query):
#d = dictionary written to per row
#D = dictionary d is written to each time, then reset
#Master = dictionary of dictionaries; the id Key (int, unique from database)
from D is used as the Key for the dictionary D entry in Master
Master = {}
D = {}
x = 0
for u in _query:
d = u.__dict__
D = {}
for n in d.keys():
if n != '_sa_instance_state':
D[n] = d[n]
x = d['id']
Master[x] = D
return Master

使用flask(包括jsonify)和flask_sqlalchemy将输出打印为JSON。

使用jsonify(serialize())调用该函数。

与我迄今为止尝试过的所有SQLAlchemy查询一起工作(运行SQLite3)

在SQLAlchemy中使用内置的序列化器:

from sqlalchemy.ext.serializer import loads, dumps
obj = MyAlchemyObject()
# serialize object
serialized_obj = dumps(obj)


# deserialize object
obj = loads(serialized_obj)
如果你在会话之间传输对象,记得使用session.expunge(obj)从当前会话中分离对象。 要再次附加它,只需执行session.add(obj).

这里有一个解决方案,可以让您选择想要在输出中包含的关系,尽可能深入。 注意:这是一个完整的重写,将dict/str作为一个参数,而不是一个列表。修复了一些东西..

def deep_dict(self, relations={}):
"""Output a dict of an SA object recursing as deep as you want.


Takes one argument, relations which is a dictionary of relations we'd
like to pull out. The relations dict items can be a single relation
name or deeper relation names connected by sub dicts


Example:
Say we have a Person object with a family relationship
person.deep_dict(relations={'family':None})
Say the family object has homes as a relation then we can do
person.deep_dict(relations={'family':{'homes':None}})
OR
person.deep_dict(relations={'family':'homes'})
Say homes has a relation like rooms you can do
person.deep_dict(relations={'family':{'homes':'rooms'}})
and so on...
"""
mydict =  dict((c, str(a)) for c, a in
self.__dict__.items() if c != '_sa_instance_state')
if not relations:
# just return ourselves
return mydict


# otherwise we need to go deeper
if not isinstance(relations, dict) and not isinstance(relations, str):
raise Exception("relations should be a dict, it is of type {}".format(type(relations)))


# got here so check and handle if we were passed a dict
if isinstance(relations, dict):
# we were passed deeper info
for left, right in relations.items():
myrel = getattr(self, left)
if isinstance(myrel, list):
mydict[left] = [rel.deep_dict(relations=right) for rel in myrel]
else:
mydict[left] = myrel.deep_dict(relations=right)
# if we get here check and handle if we were passed a string
elif isinstance(relations, str):
# passed a single item
myrel = getattr(self, relations)
left = relations
if isinstance(myrel, list):
mydict[left] = [rel.deep_dict(relations=None)
for rel in myrel]
else:
mydict[left] = myrel.deep_dict(relations=None)


return mydict

举个关于person/family/homes/rooms的例子…把它转换成json,你只需要

json.dumps(person.deep_dict(relations={'family':{'homes':'rooms'}}))

你可以像这样使用SqlAlchemy的自省:

mysql = SQLAlchemy()
from sqlalchemy import inspect


class Contacts(mysql.Model):
__tablename__ = 'CONTACTS'
id = mysql.Column(mysql.Integer, primary_key=True)
first_name = mysql.Column(mysql.String(128), nullable=False)
last_name = mysql.Column(mysql.String(128), nullable=False)
phone = mysql.Column(mysql.String(128), nullable=False)
email = mysql.Column(mysql.String(128), nullable=False)
street = mysql.Column(mysql.String(128), nullable=False)
zip_code = mysql.Column(mysql.String(128), nullable=False)
city = mysql.Column(mysql.String(128), nullable=False)
def toDict(self):
return { c.key: getattr(self, c.key) for c in inspect(self).mapper.column_attrs }


@app.route('/contacts',methods=['GET'])
def getContacts():
contacts = Contacts.query.all()
contactsArr = []
for contact in contacts:
contactsArr.append(contact.toDict())
return jsonify(contactsArr)


@app.route('/contacts/<int:id>',methods=['GET'])
def getContact(id):
contact = Contacts.query.get(id)
return jsonify(contact.toDict())

从下面的答案中得到启发: 将sqlalchemy行对象转换为python dict < / p >

在Flask下,它工作并处理datatime字段,转换类型为
的字段 'time': datetime.datetime(2018, 3, 22, 15, 40)到< br > "time": "2018-03-22 15:40:00": < / p >
obj = {c.name: str(getattr(self, c.name)) for c in self.__table__.columns}


# This to get the JSON body
return json.dumps(obj)


# Or this to get a response object
return jsonify(obj)

更详细的解释。 在你的模型中添加:

def as_dict(self):
return {c.name: str(getattr(self, c.name)) for c in self.__table__.columns}

str()用于python 3,因此如果使用python 2,则使用unicode()。它应该有助于反序列化日期。如果不处理这些,你可以删除它。

现在可以像这样查询数据库

some_result = User.query.filter_by(id=current_user.id).first().as_dict()

需要First()来避免奇怪的错误。as_dict()现在将反序列化结果。反序列化之后,就可以将其转换为json了

jsonify(some_result)

下面的代码将sqlalchemy结果序列化为json。

import json
from collections import OrderedDict




def asdict(self):
result = OrderedDict()
for key in self.__mapper__.c.keys():
if getattr(self, key) is not None:
result[key] = str(getattr(self, key))
else:
result[key] = getattr(self, key)
return result




def to_array(all_vendors):
v = [ ven.asdict() for ven in all_vendors ]
return json.dumps(v)

叫有趣,

def all_products():
all_products = Products.query.all()
return to_array(all_products)

虽然最初的问题可以追溯到很久以前,但这里的答案数量(以及我自己的经验)表明,这是一个不平凡的问题,有许多不同的方法,不同的复杂性和不同的权衡。

这就是为什么我构建了SQLAthanor库,它扩展了SQLAlchemy的声明性ORM,支持可配置的序列化/反序列化,你可能想看看。

该库支持:

  • Python 2.7、3.4、3.5和3.6。
  • SQLAlchemy版本0.9及更高版本
  • 序列化/反序列化到JSON、CSV、YAML和Python dict
  • 列/属性、关系、混合属性和关联代理的序列化/反序列化
  • 启用和禁用特定格式和列/关系/属性的序列化(例如,你想要支持入站 password值,但绝不包括出站值)
  • 序列化前和反序列化后的值处理(用于验证或类型强制)
  • 一个非常简单的语法,它是python的,并且与SQLAlchemy自己的方法无缝一致

你可以在这里查看(我希望!)综合文档:https://sqlathanor.readthedocs.io/en/latest

希望这能有所帮助!

内置序列化器因utf-8而阻塞,无法解码某些输入的无效开始字节。相反,我的答案是:

def row_to_dict(row):
temp = row.__dict__
temp.pop('_sa_instance_state', None)
return temp




def rows_to_list(rows):
ret_rows = []
for row in rows:
ret_rows.append(row_to_dict(row))
return ret_rows




@website_blueprint.route('/api/v1/some/endpoint', methods=['GET'])
def some_api():
'''
/some_endpoint
'''
rows = rows_to_list(SomeModel.query.all())
response = app.response_class(
response=jsonplus.dumps(rows),
status=200,
mimetype='application/json'
)
return response

AlchemyEncoder是很棒的,但有时会失败的十进制值。这是一个改进的编码器,解决十进制问题-

class AlchemyEncoder(json.JSONEncoder):
# To serialize SQLalchemy objects
def default(self, obj):
if isinstance(obj.__class__, DeclarativeMeta):
model_fields = {}
for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata']:
data = obj.__getattribute__(field)
print data
try:
json.dumps(data)  # this will fail on non-encodable values, like other classes
model_fields[field] = data
except TypeError:
model_fields[field] = None
return model_fields
if isinstance(obj, Decimal):
return float(obj)
return json.JSONEncoder.default(self, obj)

Python 3.7+和Flask 1.1+可以使用内置的dataclasses

from dataclasses import dataclass
from datetime import datetime
from flask import Flask, jsonify
from flask_sqlalchemy import SQLAlchemy




app = Flask(__name__)
db = SQLAlchemy(app)




@dataclass
class User(db.Model):
id: int
email: str


id = db.Column(db.Integer, primary_key=True, auto_increment=True)
email = db.Column(db.String(200), unique=True)




@app.route('/users/')
def users():
users = User.query.all()
return jsonify(users)




if __name__ == "__main__":
users = User(email="user1@gmail.com"), User(email="user2@gmail.com")
db.create_all()
db.session.add_all(users)
db.session.commit()
app.run()

/users/路由现在将返回一个用户列表。

[
{"email": "user1@gmail.com", "id": 1},
{"email": "user2@gmail.com", "id": 2}
]

自动序列化相关模型

@dataclass
class Account(db.Model):
id: int
users: User


id = db.Column(db.Integer)
users = db.relationship(User)  # User model would need a db.ForeignKey field

jsonify(account)的响应是这样的。

{
"id":1,
"users":[
{
"email":"user1@gmail.com",
"id":1
},
{
"email":"user2@gmail.com",
"id":2
}
]
}

覆盖默认的JSON编码器

from flask.json import JSONEncoder




class CustomJSONEncoder(JSONEncoder):
"Add support for serializing timedeltas"


def default(o):
if type(o) == datetime.timedelta:
return str(o)
elif type(o) == datetime.datetime:
return o.isoformat()
else:
return super().default(o)


app.json_encoder = CustomJSONEncoder

当使用sqlalchemy连接到db I时,这是一个高度可配置的简单解决方案。使用熊猫。

import pandas as pd
import sqlalchemy


#sqlalchemy engine configuration
engine = sqlalchemy.create_engine....


def my_function():
#read in from sql directly into a pandas dataframe
#check the pandas documentation for additional config options
sql_DF = pd.read_sql_table("table_name", con=engine)


# "orient" is optional here but allows you to specify the json formatting you require
sql_json = sql_DF.to_json(orient="index")


return sql_json


也许你可以使用这样的类

from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy import Table




class Custom:
"""Some custom logic here!"""


__table__: Table  # def for mypy


@declared_attr
def __tablename__(cls):  # pylint: disable=no-self-argument
return cls.__name__  # pylint: disable= no-member


def to_dict(self) -> Dict[str, Any]:
"""Serializes only column data."""
return {c.name: getattr(self, c.name) for c in self.__table__.columns}


Base = declarative_base(cls=Custom)


class MyOwnTable(Base):
#COLUMNS!

这样,所有对象都有to_dict方法

虽然使用一些原始sql和未定义对象,使用cursor.description似乎得到了我正在寻找的东西:

with connection.cursor() as cur:
print(query)
cur.execute(query)
for item in cur.fetchall():
row = {column.name: item[i] for i, column in enumerate(cur.description)}
print(row)
step1:
class CNAME:
...
def as_dict(self):
return {item.name: getattr(self, item.name) for item in self.__table__.columns}


step2:
list = []
for data in session.query(CNAME).all():
list.append(data.as_dict())


step3:
return jsonify(list)

虽然这是一篇老文章,也许我没有回答上面的问题,但我想谈谈我的连载,至少它对我有用。

我使用FastAPI,SqlAlchemy和MySQL,但我不使用orm模型;

# from sqlalchemy import create_engine
# from sqlalchemy.orm import sessionmaker
# engine = create_engine(config.SQLALCHEMY_DATABASE_URL, pool_pre_ping=True)
# SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

序列化代码




import decimal
import datetime




def alchemy_encoder(obj):
"""JSON encoder function for SQLAlchemy special classes."""
if isinstance(obj, datetime.date):
return obj.strftime("%Y-%m-%d %H:%M:%S")
elif isinstance(obj, decimal.Decimal):
return float(obj)


import json
from sqlalchemy import text


# db is SessionLocal() object


app_sql = 'SELECT * FROM app_info ORDER BY app_id LIMIT :page,:page_size'


# The next two are the parameters passed in
page = 1
page_size = 10


# execute sql and return a <class 'sqlalchemy.engine.result.ResultProxy'> object
app_list = db.execute(text(app_sql), {'page': page, 'page_size': page_size})


# serialize
res = json.loads(json.dumps([dict(r) for r in app_list], default=alchemy_encoder))


如果不行,请忽略我的回答。我在这里提到它

https://codeandlife.com/2014/12/07/sqlalchemy-results-to-json-the-easy-way/

install simplejson by pip install simplejson和创建类

class Serialise(object):


def _asdict(self):
"""
Serialization logic for converting entities using flask's jsonify


:return: An ordered dictionary
:rtype: :class:`collections.OrderedDict`
"""


result = OrderedDict()
# Get the columns
for key in self.__mapper__.c.keys():
if isinstance(getattr(self, key), datetime):
result["x"] = getattr(self, key).timestamp() * 1000
result["timestamp"] = result["x"]
else:
result[key] = getattr(self, key)


return result

并将这个类继承到每个orm类,以便这个_asdict函数被注册到每个orm类和boom。 在

的任何地方使用jsonify

(对萨莎B的的微小调整非常棒的回答)

这将具体地将datetime对象转换为字符串,这些字符串在原始答案中将转换为None:

# Standard library imports
from datetime import datetime
import json


# 3rd party imports
from sqlalchemy.ext.declarative import DeclarativeMeta


class JsonEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj.__class__, DeclarativeMeta):
dict = {}


# Remove invalid fields and just get the column attributes
columns = [x for x in dir(obj) if not x.startswith("_") and x != "metadata"]


for column in columns:
value = obj.__getattribute__(column)


try:
json.dumps(value)
dict[column] = value
except TypeError:
if isinstance(value, datetime):
dict[column] = value.__str__()
else:
dict[column] = None
return dict


return json.JSONEncoder.default(self, obj)

这是一个JSONEncoder版本,它保留模型列的顺序,只保留递归定义的列和关系字段。它还格式化了大多数不可序列化的JSON类型:

import json
from datetime import datetime
from decimal import Decimal


import arrow
from sqlalchemy.ext.declarative import DeclarativeMeta


class SQLAlchemyJSONEncoder(json.JSONEncoder):
"""
SQLAlchemy ORM JSON Encoder
If you have a "backref" relationship defined in your SQLAlchemy model,
this encoder raises a ValueError to stop an infinite loop.
"""


def default(self, obj):
if isinstance(obj, datetime):
return arrow.get(obj).isoformat()
elif isinstance(obj, Decimal):
return float(obj)
elif isinstance(obj, set):
return sorted(obj)
elif isinstance(obj.__class__, DeclarativeMeta):
for attribute, relationship in obj.__mapper__.relationships.items():
if isinstance(relationship.__getattribute__("backref"), tuple):
raise ValueError(
f'{obj.__class__} object has a "backref" relationship '
"that would cause an infinite loop!"
)
dictionary = {}
column_names = [column.name for column in obj.__table__.columns]
for key in column_names:
value = obj.__getattribute__(key)
if isinstance(value, datetime):
value = arrow.get(value).isoformat()
elif isinstance(value, Decimal):
value = float(value)
elif isinstance(value, set):
value = sorted(value)
dictionary[key] = value
for key in [
attribute
for attribute in dir(obj)
if not attribute.startswith("_")
and attribute != "metadata"
and attribute not in column_names
]:
value = obj.__getattribute__(key)
dictionary[key] = value
return dictionary


return super().default(obj)
class SqlToDict:
def __init__(self, data) -> None:
self.data = data


def to_timestamp(self, date):
if isinstance(date, datetime):
return int(datetime.timestamp(date))
else:
return date


def to_dict(self) -> List:
arr = []
for i in self.data:
keys = [*i.keys()]
values = [*i]
values = [self.to_timestamp(d) for d in values]
arr.append(dict(zip(keys, values)))
return arr

例如:

SqlToDict(data).to_dict()

我成功地使用了这个包:https://github.com/n0nSmoker/SQLAlchemy-serializer

你可以在模型上这样做:

from sqlalchemy_serializer import SerializerMixin


class SomeModel(db.Model, SerializerMixin):
...

它添加了完全递归的to_dict:

item = SomeModel.query.filter(...).one()
result = item.to_dict()

它可以让你制定规则来避免无限递归:

result = item.to_dict(rules=('-somefield', '-some_relation.nested_one.another_nested_one'))

如果你正在使用Flask并且只想快速查询:

def get_cats():
sql = text("select * from cat")
sql_params = {}
result = db.session.execute(sql, sql_params)
row_list = result.fetchall()
data = [dict(r) for r in row_list]


response = jsonify({
'data': [{
'categorias': data
}]
})
    

return response

https://flask-restplus.readthedocs.io/en/stable/marshalling.html

from flask_restplus import fields, Namespace, marshal
api = Namespace("Student data")
db_data = Student_details.query.all()
data_marshal_obj = api.model(" Data", {
"id": fields.String(),
"number": fields.Integer(),
"house_name": fields.String(),
})
data_in_json_serialize =  marshal(db_data, data_marshal_obj)}
print(type(data_in_json_serialize )) #  <class 'dict'>

定制序列化编组在烧瓶restpluse

经过一番尝试,我想出了自己的解决方案

def to_dict(self):
keys = self.__mapper__.attrs.keys()
attrs = vars(self)
return { k : attrs[k]  for k in keys}

as_dict方法添加到任何模型的动态方法

from sqlalchemy.inspection import inspect


def implement_as_dict(model):
if not hasattr(model,"as_dict"):
column_names=[]
imodel = inspect(model)
for c in imodel.columns:
column_names.append(c.key)


#define model.as_dict()
def as_dict(self):
d = {}
for c in column_names:
d[c] = getattr(self,c)
return d


setattr(model,"as_dict",as_dict)


#model definition
class User(Base):
__tablename__ = 'users'


id = Column(Integer, primary_key=True)
name = Column(String)
# adding as_dict definition to model
implement_as_dict(User)

然后你可以使用

user = session.query(User).filter_by(name='rick').first()


user.as_dict()
#sample output
{"id":1,"name":"rick"}