如何将 DataFrame 写入 postgres 表

DataFrame.to _ sql方法,但它只适用于 mysql、 sqlite 和 Oracle 数据库。我不能通过这种方法进行后续连接或 sql 炼金引擎。

233924 次浏览

从熊猫0.14(2014年5月底发布)开始,postgreql 得到了支持。sql模块现在使用 sqlalchemy来支持不同的数据库风格。您可以传递一个用于 postresql 数据库的 sql 炼金术引擎(请参见 医生)。例如:

from sqlalchemy import create_engine
engine = create_engine('postgresql://username:password@localhost:5432/mydatabase')
df.to_sql('table_name', engine)

您是正确的,在熊猫中,直到0.13.1 postgreql 版本都不受支持。如果你需要使用一个老版本的熊猫,这里是一个补丁版本的 pandas.io.sql: https://gist.github.com/jorisvandenbossche/10841234
我以前写过这个,所以不能完全保证它总是有效,但是基础应该在那里)。如果你把这个文件放到你的工作目录中并导入它,那么你应该能够做到(其中 con是一个 postgreql 连接) :

import sql  # the patched version (file is named sql.py)
sql.write_frame(df, 'table_name', con, flavor='postgresql')

更快的选择:

下面的代码将复制您的 Pandas DF 到 postgres DB,比 DF.to _ sql 方法快得多,并且不需要任何中间的 csv 文件来存储 DF。

根据 DB 规范创建引擎。

在 postgres DB 中创建一个与 Dataframe (df)具有相同列数的表。

DF 中的数据将在 postgres 表中得到 插入

from sqlalchemy import create_engine
import psycopg2
import io

如果您想替换表,我们可以使用 df 中的 Header 将其替换为标准的 to _ sql 方法,然后将整个耗费大量时间的 df 加载到 DB 中。

    engine = create_engine('postgresql+psycopg2://username:password@host:port/database')


df.head(0).to_sql('table_name', engine, if_exists='replace',index=False) #drops old table and creates new empty table


conn = engine.raw_connection()
cur = conn.cursor()
output = io.StringIO()
df.to_csv(output, sep='\t', header=False, index=False)
output.seek(0)
contents = output.getvalue()
cur.copy_from(output, 'table_name', null="") # null values become ''
conn.commit()

我就是这么做的。

它可能更快,因为它使用的是 execute_batch:

# df is the dataframe
if len(df) > 0:
df_columns = list(df)
# create (col1,col2,...)
columns = ",".join(df_columns)


# create VALUES('%s', '%s",...) one '%s' per column
values = "VALUES({})".format(",".join(["%s" for _ in df_columns]))


#create INSERT INTO table (columns) VALUES('%s',...)
insert_stmt = "INSERT INTO {} ({}) {}".format(table,columns,values)


cur = conn.cursor()
psycopg2.extras.execute_batch(cur, insert_stmt, df.values)
conn.commit()
cur.close()

熊猫0.24.0 + 解决方案

在熊猫0.24.0中引入了一个专门为 Postgres 快速写作设计的新功能。你可以在这里了解更多: https://pandas.pydata.org/pandas-docs/stable/user_guide/io.html#io-sql-method

import csv
from io import StringIO


from sqlalchemy import create_engine


def psql_insert_copy(table, conn, keys, data_iter):
# gets a DBAPI connection that can provide a cursor
dbapi_conn = conn.connection
with dbapi_conn.cursor() as cur:
s_buf = StringIO()
writer = csv.writer(s_buf)
writer.writerows(data_iter)
s_buf.seek(0)


columns = ', '.join('"{}"'.format(k) for k in keys)
if table.schema:
table_name = '{}.{}'.format(table.schema, table.name)
else:
table_name = table.name


sql = 'COPY {} ({}) FROM STDIN WITH CSV'.format(
table_name, columns)
cur.copy_expert(sql=sql, file=s_buf)


engine = create_engine('postgresql://myusername:mypassword@myhost:5432/mydatabase')
df.to_sql('table_name', engine, method=psql_insert_copy)

对于 Python 2.7和 Pandas 0.24.2以及使用 Psycopg2

Psycopg2连接模块

def dbConnect (db_parm, username_parm, host_parm, pw_parm):
# Parse in connection information
credentials = {'host': host_parm, 'database': db_parm, 'user': username_parm, 'password': pw_parm}
conn = psycopg2.connect(**credentials)
conn.autocommit = True  # auto-commit each entry to the database
conn.cursor_factory = RealDictCursor
cur = conn.cursor()
print ("Connected Successfully to DB: " + str(db_parm) + "@" + str(host_parm))
return conn, cur

连接到数据库

conn, cur = dbConnect(databaseName, dbUser, dbHost, dbPwd)

假设数据帧已经以 df 的形式存在

output = io.BytesIO() # For Python3 use StringIO
df.to_csv(output, sep='\t', header=True, index=False)
output.seek(0) # Required for rewinding the String object
copy_query = "COPY mem_info FROM STDOUT csv DELIMITER '\t' NULL ''  ESCAPE '\\' HEADER "  # Replace your table name in place of mem_info
cur.copy_expert(copy_query, output)
conn.commit()
创建引擎(其中方言 = ‘ postgres’或‘ mysql’等) :
from sqlalchemy import create_engine
engine = create_engine(f'{dialect}://{user_name}@{host}:{port}/{db_name}')
Session = sessionmaker(bind=engine)


with Session() as session:
df = pd.read_csv(path + f'/{file}')
df.to_sql('table_name', con=engine, if_exists='append',index=False)

使用 Psycopg2,您可以使用本机 sql 命令将数据写入 postgres 表。

import psycopg2
import pandas as pd


conn = psycopg2.connect("dbname='{db}' user='{user}' host='{host}' port='{port}' password='{passwd}'".format(
user=pg_user,
passwd=pg_pass,
host=pg_host,
port=pg_port,
db=pg_db))
cur = conn.cursor()
def insertIntoTable(df, table):
"""
Using cursor.executemany() to insert the dataframe
"""
# Create a list of tupples from the dataframe values
tuples = list(set([tuple(x) for x in df.to_numpy()]))
    

# Comma-separated dataframe columns
cols = ','.join(list(df.columns))
# SQL query to execute
query = "INSERT INTO %s(%s) VALUES(%%s,%%s,%%s,%%s)" % (
table, cols)
    

try:
cur.executemany(query, tuples)
conn.commit()


except (Exception, psycopg2.DatabaseError) as error:
print("Error: %s" % error)
conn.rollback()
return 1

在有/无索引的自定义模式中将 df 写入表的更快方法:

"""
Faster way to write df to table.
Slower way is to use df.to_sql()
"""


from io import StringIO


from pandas import DataFrame
from sqlalchemy.engine.base import Engine




class WriteDfToTableWithIndexMixin:
@classmethod
def write_df_to_table_with_index(
cls,
df: DataFrame,
table_name: str,
schema_name: str,
engine: Engine
):
"""
Truncate existing table and load df into table.
Keep each column as string to avoid datatype conflicts.
"""
df.head(0).to_sql(table_name, engine, if_exists='replace',
schema=schema_name, index=True, index_label='id')


conn = engine.raw_connection()
cur = conn.cursor()
output = StringIO()
df.to_csv(output, sep='\t', header=False,
index=True, index_label='id')
output.seek(0)
contents = output.getvalue()
cur.copy_expert(f"COPY {schema_name}.{table_name} FROM STDIN", output)
conn.commit()




class WriteDfToTableWithoutIndexMixin:
@classmethod
def write_df_to_table_without_index(
cls,
df: DataFrame,
table_name: str,
schema_name: str,
engine: Engine
):
"""
Truncate existing table and load df into table.
Keep each column as string to avoid datatype conflicts.
"""
df.head(0).to_sql(table_name, engine, if_exists='replace',
schema=schema_name, index=False)


conn = engine.raw_connection()
cur = conn.cursor()
output = StringIO()
df.to_csv(output, sep='\t', header=False, index=False)
output.seek(0)
contents = output.getvalue()
cur.copy_expert(f"COPY {schema_name}.{table_name} FROM STDIN", output)
conn.commit()

如果在 df 中的一列中有 JSON 值,那么上面的方法仍然会正确加载所有数据,但是 JSON 列的格式有些奇怪。因此,将 json 列转换为 ::json可能会产生错误。你必须使用 to_sql()。添加 method=multi可以加快速度,添加 chunksize可以防止机器冻结:

df.to_sql(table_name, engine, if_exists='replace', schema=schema_name, index=False, method='multi', chunksize=1000)