使用 Python 将 CSV 文件导入到 sqlite3数据库表中

我有一个 CSV 文件,我想使用 Python 将这个文件批量导入到 sqlite3数据库中。命令是”。进口... ...”。但似乎不能这样工作。有人能给我一个在 sqlite3中如何做的例子吗?我用了窗户以防万一。 谢谢

235410 次浏览

.import命令是 sqlite3命令行工具的一个特性。要在 Python 中完成此操作,只需使用 Python 所具有的任何工具(如 CSV 模块)加载数据,然后像往常一样插入数据即可。

通过这种方式,您还可以控制插入的类型,而不是依赖于 sqlite3看似没有文档的行为。

import csv, sqlite3


con = sqlite3.connect(":memory:") # change to 'sqlite:///your_filename.db'
cur = con.cursor()
cur.execute("CREATE TABLE t (col1, col2);") # use your column names here


with open('data.csv','r') as fin: # `with` statement available in 2.5+
# csv.DictReader uses first line in file for column headings by default
dr = csv.DictReader(fin) # comma is default delimiter
to_db = [(i['col1'], i['col2']) for i in dr]


cur.executemany("INSERT INTO t (col1, col2) VALUES (?, ?);", to_db)
con.commit()
con.close()

非常感谢伯尼的 回答! 不得不稍微调整一下——以下是对我有效的方法:

import csv, sqlite3
conn = sqlite3.connect("pcfc.sl3")
curs = conn.cursor()
curs.execute("CREATE TABLE PCFC (id INTEGER PRIMARY KEY, type INTEGER, term TEXT, definition TEXT);")
reader = csv.reader(open('PC.txt', 'r'), delimiter='|')
for row in reader:
to_db = [unicode(row[0], "utf8"), unicode(row[1], "utf8"), unicode(row[2], "utf8")]
curs.execute("INSERT INTO PCFC (type, term, definition) VALUES (?, ?, ?);", to_db)
conn.commit()

我的文本文件(PC.txt)如下所示:

1 | Term 1 | Definition 1
2 | Term 2 | Definition 2
3 | Term 3 | Definition 3
#!/usr/bin/python
# -*- coding: utf-8 -*-


import sys, csv, sqlite3


def main():
con = sqlite3.connect(sys.argv[1]) # database file input
cur = con.cursor()
cur.executescript("""
DROP TABLE IF EXISTS t;
CREATE TABLE t (COL1 TEXT, COL2 TEXT);
""") # checks to see if table exists and makes a fresh table.


with open(sys.argv[2], "rb") as f: # CSV file input
reader = csv.reader(f, delimiter=',') # no header information with delimiter
for row in reader:
to_db = [unicode(row[0], "utf8"), unicode(row[1], "utf8")] # Appends data from CSV file representing and handling of text
cur.execute("INSERT INTO neto (COL1, COL2) VALUES(?, ?);", to_db)
con.commit()
con.close() # closes connection to database


if __name__=='__main__':
main()

创建一个到磁盘上文件的 sqlite 连接留给读者作为练习... ... 但是现在有一个由熊猫库提供的双行程序

df = pandas.read_csv(csvfile)
df.to_sql(table_name, conn, if_exists='append', index=False)

我的2美分(更一般) :

import csv, sqlite3
import logging


def _get_col_datatypes(fin):
dr = csv.DictReader(fin) # comma is default delimiter
fieldTypes = {}
for entry in dr:
feildslLeft = [f for f in dr.fieldnames if f not in fieldTypes.keys()]
if not feildslLeft: break # We're done
for field in feildslLeft:
data = entry[field]


# Need data to decide
if len(data) == 0:
continue


if data.isdigit():
fieldTypes[field] = "INTEGER"
else:
fieldTypes[field] = "TEXT"
# TODO: Currently there's no support for DATE in sqllite


if len(feildslLeft) > 0:
raise Exception("Failed to find all the columns data types - Maybe some are empty?")


return fieldTypes




def escapingGenerator(f):
for line in f:
yield line.encode("ascii", "xmlcharrefreplace").decode("ascii")




def csvToDb(csvFile, outputToFile = False):
# TODO: implement output to file


with open(csvFile,mode='r', encoding="ISO-8859-1") as fin:
dt = _get_col_datatypes(fin)


fin.seek(0)


reader = csv.DictReader(fin)


# Keep the order of the columns name just as in the CSV
fields = reader.fieldnames
cols = []


# Set field and type
for f in fields:
cols.append("%s %s" % (f, dt[f]))


# Generate create table statement:
stmt = "CREATE TABLE ads (%s)" % ",".join(cols)


con = sqlite3.connect(":memory:")
cur = con.cursor()
cur.execute(stmt)


fin.seek(0)




reader = csv.reader(escapingGenerator(fin))


# Generate insert statement:
stmt = "INSERT INTO ads VALUES(%s);" % ','.join('?' * len(cols))


cur.executemany(stmt, reader)
con.commit()


return con

您可以使用 blazeodo有效地做到这一点

import blaze as bz
csv_path = 'data.csv'
bz.odo(csv_path, 'sqlite:///data.db::data')

Odo 将在模式 data下将 csv 文件存储到 data.db(sqlite 数据库)

或者直接使用 odo,不使用 blaze。两种方法都可以。阅读 文件

基于 Guy L 解决方案(喜欢它) ,但可以处理转义字段。

import csv, sqlite3


def _get_col_datatypes(fin):
dr = csv.DictReader(fin) # comma is default delimiter
fieldTypes = {}
for entry in dr:
feildslLeft = [f for f in dr.fieldnames if f not in fieldTypes.keys()]
if not feildslLeft: break # We're done
for field in feildslLeft:
data = entry[field]


# Need data to decide
if len(data) == 0:
continue


if data.isdigit():
fieldTypes[field] = "INTEGER"
else:
fieldTypes[field] = "TEXT"
# TODO: Currently there's no support for DATE in sqllite


if len(feildslLeft) > 0:
raise Exception("Failed to find all the columns data types - Maybe some are empty?")


return fieldTypes




def escapingGenerator(f):
for line in f:
yield line.encode("ascii", "xmlcharrefreplace").decode("ascii")




def csvToDb(csvFile,dbFile,tablename, outputToFile = False):


# TODO: implement output to file


with open(csvFile,mode='r', encoding="ISO-8859-1") as fin:
dt = _get_col_datatypes(fin)


fin.seek(0)


reader = csv.DictReader(fin)


# Keep the order of the columns name just as in the CSV
fields = reader.fieldnames
cols = []


# Set field and type
for f in fields:
cols.append("\"%s\" %s" % (f, dt[f]))


# Generate create table statement:
stmt = "create table if not exists \"" + tablename + "\" (%s)" % ",".join(cols)
print(stmt)
con = sqlite3.connect(dbFile)
cur = con.cursor()
cur.execute(stmt)


fin.seek(0)




reader = csv.reader(escapingGenerator(fin))


# Generate insert statement:
stmt = "INSERT INTO \"" + tablename + "\" VALUES(%s);" % ','.join('?' * len(cols))


cur.executemany(stmt, reader)
con.commit()
con.close()
import csv, sqlite3


def _get_col_datatypes(fin):
dr = csv.DictReader(fin) # comma is default delimiter
fieldTypes = {}
for entry in dr:
feildslLeft = [f for f in dr.fieldnames if f not in fieldTypes.keys()]
if not feildslLeft: break # We're done
for field in feildslLeft:
data = entry[field]


# Need data to decide
if len(data) == 0:
continue


if data.isdigit():
fieldTypes[field] = "INTEGER"
else:
fieldTypes[field] = "TEXT"
# TODO: Currently there's no support for DATE in sqllite


if len(feildslLeft) > 0:
raise Exception("Failed to find all the columns data types - Maybe some are empty?")


return fieldTypes




def escapingGenerator(f):
for line in f:
yield line.encode("ascii", "xmlcharrefreplace").decode("ascii")




def csvToDb(csvFile,dbFile,tablename, outputToFile = False):


# TODO: implement output to file


with open(csvFile,mode='r', encoding="ISO-8859-1") as fin:
dt = _get_col_datatypes(fin)


fin.seek(0)


reader = csv.DictReader(fin)


# Keep the order of the columns name just as in the CSV
fields = reader.fieldnames
cols = []


# Set field and type
for f in fields:
cols.append("\"%s\" %s" % (f, dt[f]))


# Generate create table statement:
stmt = "create table if not exists \"" + tablename + "\" (%s)" % ",".join(cols)
print(stmt)
con = sqlite3.connect(dbFile)
cur = con.cursor()
cur.execute(stmt)


fin.seek(0)




reader = csv.reader(escapingGenerator(fin))


# Generate insert statement:
stmt = "INSERT INTO \"" + tablename + "\" VALUES(%s);" % ','.join('?' * len(cols))


cur.executemany(stmt, reader)
con.commit()
con.close()

为了简单起见,可以使用项目 Makefile 中的 sqlite3命令行工具。

%.sql3: %.csv
rm -f $@
sqlite3 $@ -echo -cmd ".mode csv" ".import $< $*"
%.dump: %.sql3
sqlite3 $< "select * from $*"

然后,make test.sql3使用单个表“ test”从现有 test.csv 文件创建 sqlite 数据库。然后您可以通过 make test.dump验证内容。

如果 CSV 文件必须作为 python 程序的一部分导入,那么为了简单和高效,您可以按照以下建议使用 os.system:

import os


cmd = """sqlite3 database.db <<< ".import input.csv mytable" """


rc = os.system(cmd)


print(rc)


关键是,通过指定数据库的文件名,数据将自动保存,前提是没有读取数据的错误。

.import是正确的选择,但这是来自 SQLite3命令行程序的命令。这个问题的很多顶级答案都涉及到本地的 python 循环,但是如果你的文件很大(我的是10 ^ 6到10 ^ 7条记录) ,你要避免把所有内容都读入熊猫或者使用本地的 python 列表内涵/循环(尽管我没有计时进行比较)。

对于大文件,我认为最好的选择是使用 subprocess.run()来执行 sqlite 的 import 命令。在下面的示例中,我假设表已经存在,但是 csv 文件的头在第一行。有关更多信息,请参见 .import文件

subprocess.run()

from pathlib import Path
db_name = Path('my.db').resolve()
csv_file = Path('file.csv').resolve()
result = subprocess.run(['sqlite3',
str(db_name),
'-cmd',
'.mode csv',
'.import --skip 1 ' + str(csv_file).replace('\\','\\\\')
+' <table_name>'],
capture_output=True)

编辑注意: sqlite3的 .import命令已经得到改进,它可以将第一行视为头名称,甚至可以跳过第一个 X行(如 这个答案所示,需要版本 > = 3.32)。如果您有一个旧版本的 sqlite3,那么您可能需要首先创建表,然后在导入之前去掉 csv 的第一行。--skip 1参数将在3.32之前给出一个错误

解释
在命令行中,您要查找的命令是 sqlite3 my.db -cmd ".mode csv" ".import file.csv table"subprocess.run()运行一个命令行进程。subprocess.run()的参数是一个字符串序列,它被解释为一个命令,后面跟着它的所有参数。

  • sqlite3 my.db打开数据库
  • 数据库后的 -cmd标志允许您向 sqlite 程序传递多个 follow on 命令。在 shell 中,每个命令都必须使用引号,但是在这里,它们只需要是序列中自己的元素
  • '.mode csv'正如你所期望的那样
  • '.import --skip 1'+str(csv_file).replace('\\','\\\\')+' <table_name>'是 import 命令。
    遗憾的是,由于 subprocess 将所有后续操作作为引号字符串传递给 -cmd,因此如果有一个 windows 目录路径,则需要将反斜杠加倍。

脱衣舞女郎

这不是问题的重点,但这是我用的方法。同样,我不想在任何时候把整个文件读入内存:

with open(csv, "r") as source:
source.readline()
with open(str(csv)+"_nohead", "w") as target:
shutil.copyfileobj(source, target)


我发现有必要将数据从 csv 传输到数据库的过程分成几块,这样就不会耗尽内存。可以这样做:

import csv
import sqlite3
from operator import itemgetter


# Establish connection
conn = sqlite3.connect("mydb.db")


# Create the table
conn.execute(
"""
CREATE TABLE persons(
person_id INTEGER,
last_name TEXT,
first_name TEXT,
address TEXT
)
"""
)


# These are the columns from the csv that we want
cols = ["person_id", "last_name", "first_name", "address"]


# If the csv file is huge, we instead add the data in chunks
chunksize = 10000


# Parse csv file and populate db in chunks
with conn, open("persons.csv") as f:
reader = csv.DictReader(f)


chunk = []
for i, row in reader:


if i % chunksize == 0 and i > 0:
conn.executemany(
"""
INSERT INTO persons
VALUES(?, ?, ?, ?)
""", chunk
)
chunk = []


items = itemgetter(*cols)(row)
chunk.append(items)


如果你的 CSV 文件很大,这里有一些解决方案。按照另一个答案的建议使用 to_sql,但是设置块大小,这样它就不会尝试一次处理整个文件。

import sqlite3
import pandas as pd


conn = sqlite3.connect('my_data.db')
c = conn.cursor()
users = pd.read_csv('users.csv')
users.to_sql('users', conn, if_exists='append', index = False, chunksize = 10000)

你也可以使用 Dask,就像 给你描述的那样,并行地编写很多熊猫数据框架:

dto_sql = dask.delayed(pd.DataFrame.to_sql)
out = [dto_sql(d, 'table_name', db_url, if_exists='append', index=True)
for d in ddf.to_delayed()]
dask.compute(*out)

有关详细信息,请参阅 给你

下面还可以基于 CSV 头添加字段的名称:

import sqlite3


def csv_sql(file_dir,table_name,database_name):
con = sqlite3.connect(database_name)
cur = con.cursor()
# Drop the current table by:
# cur.execute("DROP TABLE IF EXISTS %s;" % table_name)


with open(file_dir, 'r') as fl:
hd = fl.readline()[:-1].split(',')
ro = fl.readlines()
db = [tuple(ro[i][:-1].split(',')) for i in range(len(ro))]


header = ','.join(hd)
cur.execute("CREATE TABLE IF NOT EXISTS %s (%s);" % (table_name,header))
cur.executemany("INSERT INTO %s (%s) VALUES (%s);" % (table_name,header,('?,'*len(hd))[:-1]), db)
con.commit()
con.close()


# Example:
csv_sql('./surveys.csv','survey','eco.db')

有了这个,你也可以加入 CSV:

import sqlite3
import os
import pandas as pd
from typing import List


class CSVDriver:
def __init__(self, table_dir_path: str):
self.table_dir_path = table_dir_path  # where tables (ie. csv files) are located
self._con = None


@property
def con(self) -> sqlite3.Connection:
"""Make a singleton connection to an in-memory SQLite database"""
if not self._con:
self._con = sqlite3.connect(":memory:")
return self._con
    

def _exists(self, table: str) -> bool:
query = """
SELECT name
FROM sqlite_master
WHERE type ='table'
AND name NOT LIKE 'sqlite_%';
"""
tables = self.con.execute(query).fetchall()
return table in tables


def _load_table_to_mem(self, table: str, sep: str = None) -> None:
"""
Load a CSV into an in-memory SQLite database
sep is set to None in order to force pandas to auto-detect the delimiter
"""
if self._exists(table):
return
file_name = table + ".csv"
path = os.path.join(self.table_dir_path, file_name)
if not os.path.exists(path):
raise ValueError(f"CSV table {table} does not exist in {self.table_dir_path}")
df = pd.read_csv(path, sep=sep, engine="python")  # set engine to python to skip pandas' warning
df.to_sql(table, self.con, if_exists='replace', index=False, chunksize=10000)


def query(self, query: str) -> List[tuple]:
"""
Run an SQL query on CSV file(s).
Tables are loaded from table_dir_path
"""
tables = extract_tables(query)
for table in tables:
self._load_table_to_mem(table)
cursor = self.con.cursor()
cursor.execute(query)
records = cursor.fetchall()
return records

Extract _ tables () :

import sqlparse
from sqlparse.sql import IdentifierList, Identifier,  Function
from sqlparse.tokens import Keyword, DML
from collections import namedtuple
import itertools


class Reference(namedtuple('Reference', ['schema', 'name', 'alias', 'is_function'])):
__slots__ = ()


def has_alias(self):
return self.alias is not None


@property
def is_query_alias(self):
return self.name is None and self.alias is not None


@property
def is_table_alias(self):
return self.name is not None and self.alias is not None and not self.is_function


@property
def full_name(self):
if self.schema is None:
return self.name
else:
return self.schema + '.' + self.name


def _is_subselect(parsed):
if not parsed.is_group:
return False
for item in parsed.tokens:
if item.ttype is DML and item.value.upper() in ('SELECT', 'INSERT',
'UPDATE', 'CREATE', 'DELETE'):
return True
return False




def _identifier_is_function(identifier):
return any(isinstance(t, Function) for t in identifier.tokens)




def _extract_from_part(parsed):
tbl_prefix_seen = False
for item in parsed.tokens:
if item.is_group:
for x in _extract_from_part(item):
yield x
if tbl_prefix_seen:
if _is_subselect(item):
for x in _extract_from_part(item):
yield x
# An incomplete nested select won't be recognized correctly as a
# sub-select. eg: 'SELECT * FROM (SELECT id FROM user'. This causes
# the second FROM to trigger this elif condition resulting in a
# StopIteration. So we need to ignore the keyword if the keyword
# FROM.
# Also 'SELECT * FROM abc JOIN def' will trigger this elif
# condition. So we need to ignore the keyword JOIN and its variants
# INNER JOIN, FULL OUTER JOIN, etc.
elif item.ttype is Keyword and (
not item.value.upper() == 'FROM') and (
not item.value.upper().endswith('JOIN')):
tbl_prefix_seen = False
else:
yield item
elif item.ttype is Keyword or item.ttype is Keyword.DML:
item_val = item.value.upper()
if (item_val in ('COPY', 'FROM', 'INTO', 'UPDATE', 'TABLE') or
item_val.endswith('JOIN')):
tbl_prefix_seen = True
# 'SELECT a, FROM abc' will detect FROM as part of the column list.
# So this check here is necessary.
elif isinstance(item, IdentifierList):
for identifier in item.get_identifiers():
if (identifier.ttype is Keyword and
identifier.value.upper() == 'FROM'):
tbl_prefix_seen = True
break




def _extract_table_identifiers(token_stream):
for item in token_stream:
if isinstance(item, IdentifierList):
for ident in item.get_identifiers():
try:
alias = ident.get_alias()
schema_name = ident.get_parent_name()
real_name = ident.get_real_name()
except AttributeError:
continue
if real_name:
yield Reference(schema_name, real_name,
alias, _identifier_is_function(ident))
elif isinstance(item, Identifier):
yield Reference(item.get_parent_name(), item.get_real_name(),
item.get_alias(), _identifier_is_function(item))
elif isinstance(item, Function):
yield Reference(item.get_parent_name(), item.get_real_name(),
item.get_alias(), _identifier_is_function(item))




def extract_tables(sql):
# let's handle multiple statements in one sql string
extracted_tables = []
statements = list(sqlparse.parse(sql))
for statement in statements:
stream = _extract_from_part(statement)
extracted_tables.append([ref.name for ref in _extract_table_identifiers(stream)])
return list(itertools.chain(*extracted_tables))

示例(假设 /path/to/files中存在 account.csvtojoin.csv) :

db_path = r"/path/to/files"
driver = CSVDriver(db_path)
query = """
SELECT tojoin.col_to_join
FROM account
LEFT JOIN tojoin
ON account.a = tojoin.a
"""
driver.query(query)
"""
cd Final_Codes
python csv_to_db.py
CSV to SQL DB
"""


import csv
import sqlite3
import os
import fnmatch


UP_FOLDER = os.path.dirname(os.getcwd())
DATABASE_FOLDER = os.path.join(UP_FOLDER, "Databases")
DBNAME = "allCompanies_database.db"




def getBaseNameNoExt(givenPath):
"""Returns the basename of the file without the extension"""
filename = os.path.splitext(os.path.basename(givenPath))[0]
return filename




def find(pattern, path):
"""Utility to find files wrt a regex search"""
result = []
for root, dirs, files in os.walk(path):
for name in files:
if fnmatch.fnmatch(name, pattern):
result.append(os.path.join(root, name))
return result




if __name__ == "__main__":
Database_Path = os.path.join(DATABASE_FOLDER, DBNAME)
# change to 'sqlite:///your_filename.db'
csv_files = find('*.csv', DATABASE_FOLDER)


con = sqlite3.connect(Database_Path)
cur = con.cursor()
for each in csv_files:
with open(each, 'r') as fin:  # `with` statement available in 2.5+
# csv.DictReader uses first line in file for column headings by default
dr = csv.DictReader(fin)  # comma is default delimiter
TABLE_NAME = getBaseNameNoExt(each)
Cols = dr.fieldnames
numCols = len(Cols)
"""
for i in dr:
print(i.values())
"""
to_db = [tuple(i.values()) for i in dr]
print(TABLE_NAME)
# use your column names here
ColString = ','.join(Cols)
QuestionMarks = ["?"] * numCols
ToAdd = ','.join(QuestionMarks)
cur.execute(f"CREATE TABLE {TABLE_NAME} ({ColString});")
cur.executemany(
f"INSERT INTO {TABLE_NAME} ({ColString}) VALUES ({ToAdd});", to_db)
con.commit()
con.close()
print("Execution Complete!")


当您希望将一个文件夹中的大量 csv 文件转换为一个文件夹时,这将派上用场。启动数据库文件!

请注意,您不必事先知道文件名、表名或字段名(列名) !

下面是我的版本,通过要求您选择要转换的“ . csv”文件已经可以工作了

from multiprocessing import current_process
import pandas as pd
import sqlite3
import os
from tkinter import Tk
from tkinter.filedialog import askopenfilename
from pathlib import Path


def csv_to_db(csv_filedir):


if not Path(csv_filedir).is_file():                         # if needed ask for user input of CVS file
current_path = os.getcwd()
Tk().withdraw()
csv_filedir = askopenfilename(initialdir=current_path)


try:
data = pd.read_csv(csv_filedir)                             # load CSV file
except:
print("Something went wrong when opening to the file")
print(csv_filedir)


csv_df = pd.DataFrame(data)
csv_df = csv_df.fillna('NULL')                              # make NaN = to 'NULL' for SQL format


[path,filename] = os.path.split(csv_filedir)                # define path and filename
[filename,_] = os.path.splitext(filename)
database_filedir = os.path.join(path, filename + '.db')


conn = sqlite3.connect(database_filedir)                    # connect to SQL server


[fields_sql, header_sql_string] = create_sql_fields(csv_df)


# CREATE EMPTY DATABASE
create_sql = ''.join(['CREATE TABLE IF NOT EXISTS ' + filename + ' (' + fields_sql + ')'])
cursor = conn.cursor()
cursor.execute(create_sql)
    

# INSERT EACH ROW IN THE SQL DATABASE
for irow in csv_df.itertuples():
insert_values_string = ''.join(['INSERT INTO ', filename, header_sql_string, ' VALUES ('])
insert_sql = f"{insert_values_string} {irow[1]}, '{irow[2]}','{irow[3]}', {irow[4]}, '{irow[5]}' )"
print(insert_sql)
cursor.execute(insert_sql)


# COMMIT CHANGES TO DATABASE AND CLOSE CONNECTION
conn.commit()
conn.close()


print('\n' + csv_filedir + ' \n converted to \n' + database_filedir)


return database_filedir




def create_sql_fields(df):                                          # gather the headers of the CSV and create two strings
fields_sql = []                                                 # str1 = var1 TYPE, va2, TYPE ...
header_names = []                                               # str2 = var1, var2, var3, var4
for col in range(0,len(df.columns)):
fields_sql.append(df.columns[col])
fields_sql.append(str(df.dtypes[col]))


header_names.append(df.columns[col])
if col != len(df.columns)-1:
fields_sql.append(',')
header_names.append(',')


fields_sql = ' '.join(fields_sql)
fields_sql = fields_sql.replace('int64','integer')
fields_sql = fields_sql.replace('float64','integer')
fields_sql = fields_sql.replace('object','text')


header_sql_string = '(' + ''.join(header_names) + ')'
    

return fields_sql, header_sql_string




csv_to_db('')