将 ORM 转换为熊猫数据框架

是否有将 SQLAlchemy<Query object>转换为熊猫 DataFrame 的解决方案?

熊猫有能力使用 pandas.read_sql,但这需要使用原始 SQL。我有两个理由想要避免它:

  1. 我已经有一切使用 ORM (一个很好的理由本身)和
  2. 我使用 python 列表作为查询的一部分,例如:

db.session.query(Item).filter(Item.symbol.in_(add_symbols),其中 Item是我的模型类,add_symbols是一个列表) ,这相当于 SQL SELECT ... from ... WHERE ... IN

有可能吗?

117540 次浏览

在大多数情况下,以下方法应该有效:

df = pd.read_sql(query.statement, query.session.bind)

有关参数的更多信息,请参见 pandas.read_sql文档。

如果您希望使用参数和方言特定参数编译查询,可以使用以下方法:

c = query.statement.compile(query.session.bind)
df = pandas.read_sql(c.string, query.session.bind, params=c.params)

为了让熊猫程序员新手更清楚这一点,这里有一个具体的例子,

pd.read_sql(session.query(Complaint).filter(Complaint.id == 2).statement,session.bind)

这里我们从投诉表中选择一个 id = 2的投诉表(sql 炼金术模型就是投诉表)

所选择的解决方案对我不起作用,因为我不断得到错误

AttributeError: ‘ AnnotatedSelect’对象没有属性‘ lower’

我发现以下方法很有效:

df = pd.read_sql_query(query.statement, engine)
from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker


engine = create_engine('postgresql://postgres:postgres@localhost:5432/DB', echo=False)
Base = declarative_base(bind=engine)
Session = sessionmaker(bind=engine)
session = Session()


conn = session.bind


class DailyTrendsTable(Base):


__tablename__ = 'trends'
__table_args__ = ({"schema": 'mf_analysis'})


company_code = Column(DOUBLE_PRECISION, primary_key=True)
rt_bullish_trending = Column(Integer)
rt_bearish_trending = Column(Integer)
rt_bullish_non_trending = Column(Integer)
rt_bearish_non_trending = Column(Integer)
gen_date = Column(Date, primary_key=True)


df_query = select([DailyTrendsTable])


df_data = pd.read_sql(rt_daily_query, con = conn)

为了完整起见: 作为熊猫函数 read_sql_query()的替代,您还可以使用熊猫-数据框架-函数 from_records()来转换 structured or record ndarray to DataFrame
如果您已经在 SQLAlchemy 中执行了查询并且已经得到了结果,那么这将非常方便:

import pandas as pd
from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import scoped_session, sessionmaker




SQLALCHEMY_DATABASE_URI = 'postgresql://postgres:postgres@localhost:5432/my_database'
engine = create_engine(SQLALCHEMY_DATABASE_URI, pool_pre_ping=True, echo=False)
db = scoped_session(sessionmaker(autocommit=False, autoflush=False, bind=engine))
Base = declarative_base(bind=engine)




class Currency(Base):
"""The `Currency`-table"""
__tablename__ = "currency"
__table_args__ = {"schema": "data"}


id = Column(Integer, primary_key=True, nullable=False)
name = Column(String(64), nullable=False)




# Defining the SQLAlchemy-query
currency_query = db.query(Currency).with_entities(Currency.id, Currency.name)


# Getting all the entries via SQLAlchemy
currencies = currency_query.all()


# We provide also the (alternate) column names and set the index here,
# renaming the column `id` to `currency__id`
df_from_records = pd.DataFrame.from_records(currencies
, index='currency__id'
, columns=['currency__id', 'name'])
print(df_from_records.head(5))


# Or getting the entries via Pandas instead of SQLAlchemy using the
# aforementioned function `read_sql_query()`. We can set the index-columns here as well
df_from_query = pd.read_sql_query(currency_query.statement, db.bind, index_col='id')
# Renaming the index-column(s) from `id` to `currency__id` needs another statement
df_from_query.index.rename(name='currency__id', inplace=True)
print(df_from_query.head(5))

这个答案提供了一个使用 SQLAlchemyselect语句并返回熊猫数据帧的可重复示例。它基于内存中的 SQLite 数据库,因此任何人都可以在不安装数据库引擎的情况下复制它。

import pandas
from sqlalchemy import create_engine
from sqlalchemy import MetaData, Table, Column, Text
from sqlalchemy.orm import Session

定义表元数据并创建表

engine = create_engine('sqlite://')
meta = MetaData()
meta.bind = engine
user_table = Table('user', meta,
Column("name", Text),
Column("full_name", Text))
user_table.create()

user表中插入一些数据

stmt = user_table.insert().values(name='Bob', full_name='Sponge Bob')
with Session(engine) as session:
result = session.execute(stmt)
session.commit()

选择语句的结果读入熊猫数据框

# Select data into a pandas data frame
stmt = user_table.select().where(user_table.c.name == 'Bob')
df = pandas.read_sql_query(stmt, engine)
df
Out:
name   full_name
0  Bob  Sponge Bob

如果使用 SQL 查询

def generate_df_from_sqlquery(query):
from pandas import DataFrame
query = db.session.execute(query)
df = DataFrame(query.fetchall())
if len(df) > 0:
df.columns = query.keys()
else:
columns = query.keys()
df = pd.DataFrame(columns=columns)
return df


profile_df = generate_df_from_sqlquery(profile_query)

使用 2.0 SQLalchemy语法(在1.4中也可以使用 future=True标志) ,看起来 pd.read_sql还没有实现,它将引发:

NotImplementedError: This method is not implemented for SQLAlchemy 2.0.

这是一个公开的问题,将不会被解决,直到熊猫2.0,你可以找到一些关于这个 给你给你的信息。

我没有找到任何令人满意的工作,但有些人似乎使用两种配置的引擎,一个与未来的旗帜错误:

engine2 = create_engine(URL_string, echo=False, future=False)

如果你查询字符串,这个解决方案是可以的,但是使用 ORM,我能做的最好的是一个自定义函数还没有被优化,但是它工作:

Conditions = session.query(ExampleTable)
def df_from_sql(query):
return pd.DataFrame([i.__dict__ for i in query]).drop(columns='_sa_instance_state')
df = df_from_sql(ExampleTable)

在 pd.read _ sql 实现新语法之前,这个解决方案在任何情况下都是临时的。

当你使用 ORM 的时候,它就是这么简单:

pd.DataFrame([r._asdict() for r in query.all()])

当您不想将 sql 和会话公开给业务逻辑代码时,这是 pd.read_sql的一个很好的替代方案。

在这里找到的: https://stackoverflow.com/a/52208023/1635525