How can I get dict from sqlite query?

db = sqlite.connect("test.sqlite")
res = db.execute("select * from table")

With iteration I get lists coresponding to the rows.

for row in res:
print row

I can get name of the columns

col_name_list = [tuple[0] for tuple in res.description]

But is there some function or setting to get dictionaries instead of list?

{'col1': 'value', 'col2': 'value'}

or I have to do myself?

155664 次浏览

来自 PEP 249:

Question:


How can I construct a dictionary out of the tuples returned by
.fetch*():


Answer:


There are several existing tools available which provide
helpers for this task. Most of them use the approach of using
the column names defined in the cursor attribute .description
as basis for the keys in the row dictionary.


Note that the reason for not extending the DB API specification
to also support dictionary return values for the .fetch*()
methods is that this approach has several drawbacks:


* Some databases don't support case-sensitive column names or
auto-convert them to all lowercase or all uppercase
characters.


* Columns in the result set which are generated by the query
(e.g.  using SQL functions) don't map to table column names
and databases usually generate names for these columns in a
very database specific way.


As a result, accessing the columns through dictionary keys
varies between databases and makes writing portable code
impossible.

所以是的,你自己来。

您可以使用 行 _ 工厂,如文档中的示例所示:

import sqlite3


def dict_factory(cursor, row):
d = {}
for idx, col in enumerate(cursor.description):
d[col[0]] = row[idx]
return d


con = sqlite3.connect(":memory:")
con.row_factory = dict_factory
cur = con.cursor()
cur.execute("select 1 as a")
print cur.fetchone()["a"]

或者按照文档中这个例子之后给出的建议:

如果返回元组还不够 你想要基于名字的访问 列,则应考虑设置 Row _ Factory 转换为高度优化的 Row 类型 基于索引和不区分大小写 对列的基于名称的访问 几乎没有内存开销。它会 可能比你自己的更好 基于自定义字典的方法或 甚至是基于 db _ row 的解决方案。

下面是第二种解决方案的代码:

con = sqlite3.connect(…)
con.row_factory = sqlite3.Row   #   add this row
cursor = con.cursor()

即使使用 sqlite3.Row 类——您仍然不能使用以下形式的字符串格式化:

print "%(id)i - %(name)s: %(value)s" % row

为了解决这个问题,我使用了一个 helper 函数,它接受这一行并将其转换为 dictionary。我只在 dictionary 对象优于 Row 对象的情况下使用它(例如,对于类似字符串格式化的情况,Row 对象本身不支持 dictionary API)。但在其他所有时间使用 Row 对象。

def dict_from_row(row):
return dict(zip(row.keys(), row))

或者您可以像下面这样将 sqlite3.Rows 转换为字典。这将为每一行提供一个包含列表的字典。

    def from_sqlite_Row_to_dict(list_with_rows):
''' Turn a list with sqlite3.Row objects into a dictionary'''
d ={} # the dictionary to be filled with the row data and to be returned


for i, row in enumerate(list_with_rows): # iterate throw the sqlite3.Row objects
l = [] # for each Row use a separate list
for col in range(0, len(row)): # copy over the row date (ie. column data) to a list
l.append(row[col])
d[i] = l # add the list to the dictionary
return d

一个通用的替代品,只用了三行

def select_column_and_value(db, sql, parameters=()):
execute = db.execute(sql, parameters)
fetch = execute.fetchone()
return {k[0]: v for k, v in list(zip(execute.description, fetch))}


con = sqlite3.connect('/mydatabase.db')
c = con.cursor()
print(select_column_and_value(c, 'SELECT * FROM things WHERE id=?', (id,)))

但是如果您的查询没有返回任何结果,将导致错误。

def select_column_and_value(self, sql, parameters=()):
execute = self.execute(sql, parameters)
fetch = execute.fetchone()


if fetch is None:
return {k[0]: None for k in execute.description}


return {k[0]: v for k, v in list(zip(execute.description, fetch))}

或者

def select_column_and_value(self, sql, parameters=()):
execute = self.execute(sql, parameters)
fetch = execute.fetchone()


if fetch is None:
return {}


return {k[0]: v for k, v in list(zip(execute.description, fetch))}

我认为我回答了这个问题,尽管答案在亚当 · 施米德格和亚历克斯 · 马特利的答案中都有部分提到。为了让其他像我一样有同样问题的人,能够很容易地找到答案。

conn = sqlite3.connect(":memory:")


#This is the important part, here we are setting row_factory property of
#connection object to sqlite3.Row(sqlite3.Row is an implementation of
#row_factory)
conn.row_factory = sqlite3.Row
c = conn.cursor()
c.execute('select * from stocks')


result = c.fetchall()
#returns a list of dictionaries, each item in list(each dictionary)
#represents a row of the table

简短版:

db.row_factory = lambda c, r: dict([(col[0], r[idx]) for idx, col in enumerate(c.description)])
import sqlite3


db = sqlite3.connect('mydatabase.db')
cursor = db.execute('SELECT * FROM students ORDER BY CREATE_AT')
studentList = cursor.fetchall()


columnNames = list(map(lambda x: x[0], cursor.description)) #students table column names list
studentsAssoc = {} #Assoc format is dictionary similarly




#THIS IS ASSOC PROCESS
for lineNumber, student in enumerate(studentList):
studentsAssoc[lineNumber] = {}


for columnNumber, value in enumerate(student):
studentsAssoc[lineNumber][columnNames[columnNumber]] = value




print(studentsAssoc)

结果是肯定的,但我不知道最好的。

我测试中最快的:

conn.row_factory = lambda c, r: dict(zip([col[0] for col in c.description], r))
c = conn.cursor()


%timeit c.execute('SELECT * FROM table').fetchall()
19.8 µs ± 1.05 µs per loop (mean ± std. dev. of 7 runs, 100000 loops each)

Vs:

conn.row_factory = lambda c, r: dict([(col[0], r[idx]) for idx, col in enumerate(c.description)])
c = conn.cursor()


%timeit c.execute('SELECT * FROM table').fetchall()
19.4 µs ± 75.6 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

你决定:)

与前面提到的解决方案类似,但最为紧凑:

db.row_factory = lambda C, R: { c[0]: R[i] for i, c in enumerate(C.description) }

Python 中的字典提供对其元素的任意访问。 因此,任何带有“ name”的字典,尽管它可能一方面提供信息(也就是字段名称) ,但它会“取消排序”字段,这可能是不需要的。

最好的方法是将这些名字放在一个单独的列表中,然后根据需要将它们与结果结合起来。

try:
mycursor = self.memconn.cursor()
mycursor.execute('''SELECT * FROM maintbl;''')
#first get the names, because they will be lost after retrieval of rows
names = list(map(lambda x: x[0], mycursor.description))
manyrows = mycursor.fetchall()


return manyrows, names

还要记住,在所有方法中,名称都是查询中提供的名称,而不是数据库中的名称。SELECT * FROM是个例外

如果您唯一关心的是使用字典获得结果,那么一定要使用 conn.row_factory = sqlite3.Row(在另一个答案中已经说明)。

连接到 SQLite 之后: 仅仅运行 con = sqlite3.connect(.....)就足够了:

con.row_factory = sqlite3.Row

瞧!

正如@gandalf 的回答所提到的,必须使用 conn.row_factory = sqlite3.Row,但结果是使用 不是直接的字典。在最后一个循环中,必须向 dict添加一个额外的“强制转换”:

import sqlite3
conn = sqlite3.connect(":memory:")
conn.execute('create table t (a text, b text, c text)')
conn.execute('insert into t values ("aaa", "bbb", "ccc")')
conn.execute('insert into t values ("AAA", "BBB", "CCC")')
conn.row_factory = sqlite3.Row
c = conn.cursor()
c.execute('select * from t')
for r in c.fetchall():
print(dict(r))


# {'a': 'aaa', 'b': 'bbb', 'c': 'ccc'}
# {'a': 'AAA', 'b': 'BBB', 'c': 'CCC'}

我觉得你的方向是对的。让我们保持这个非常简单和完整,你试图做什么:

import sqlite3
db = sqlite3.connect("test.sqlite3")
cur = db.cursor()
res = cur.execute("select * from table").fetchall()
data = dict(zip([c[0] for c in cur.description], res[0]))


print(data)

缺点是,如果您的表非常大,那么 .fetchall()就是对您的内存消耗的谋杀。但是对于只处理几千行文本和数字列的琐碎应用程序来说,这种简单的方法已经足够好了。

对于严肃的问题,您应该查看行工厂,正如许多其他答案中提出的那样。

获取查询的结果

output_obj = con.execute(query)
results = output_obj.fetchall()

选项1)显式回路 w/Zip

for row in results:
col_names = [tup[0] for tup in output_obj.description]
row_values = [i for i in row]
row_as_dict = dict(zip(col_names,row_values))

选项2)更快速的循环 w/Dict Comp

for row in results:
row_as_dict = {output_obj.description[i][0]:row[i] for i in range(len(row))}
def getUsers(self,assoc=False):
result = self.cursor.execute("SELECT * FROM users").fetchall()
result_len = len(result)
if(result_len == False): return
if(assoc != True):
return result
else:
result_formated = []
columns = [column[0] for column in self.cursor.description]
for row in result:
row_dict = {}
i = 0
# print(result_len)
while(i <= result_len):
row_dict[columns[i]] = row[i]
i += 1
result_formated.append(row_dict)
return result_formated

我把我的错误代码留在这里