输出 pyodbc 游标结果为 python 字典

如何将 pyodbc 游标输出(来自 .fetchone.fetchmany.fetchall)序列化为 Python 字典?

我正在使用 bottlepy,需要返回 dict,这样它才能以 JSON 的形式返回。

139425 次浏览

Assuming you know you column names! Also, here are three different solutions,
you probably want to look at the last one!

colnames = ['city', 'area', 'street']
data = {}


counter = 0
for row in x.fetchall():
if not counter in data:
data[counter] = {}


colcounter = 0
for colname in colnames:
data[counter][colname] = row[colcounter]
colcounter += 1


counter += 1

That's an indexed version, not the most beautiful solution but it will work. Another would be to index the column name as dictionary key with a list within each key containing the data in order of row number. by doing:

colnames = ['city', 'area', 'street']
data = {}


for row in x.fetchall():
colindex = 0
for col in colnames:
if not col in data:
data[col] = []
data[col].append(row[colindex])
colindex += 1

Writing this, i understand that doing for col in colnames could be replaced by for colindex in range(0, len()) but you get the idea. The later example tho would be useful when not fetching all data, but one row at a time, for instance:

Using dict for each row of data

def fetchone_dict(stuff):
colnames = ['city', 'area', 'street']
data = {}


for colindex in range(0, colnames):
data[colnames[colindex]] = stuff[colindex]
return data


row = x.fetchone()
print fetchone_dict(row)['city']

Getting tablenames (i think.. thanks to Foo Stack):
a more direct solution from beargle below!

cursor.execute("SELECT sys.objects.name, sys.columns.name FROM sys.objects INNER JOIN sys.columns ON sys.objects.object_id = sys.columns. object_id WHERE sys.objects.type = 'U';")
schema = {}
for it in cursor.fetchall():
if it[0] in schema:
schema[it[0]].append(it[1])
else:
schema[it[0]] = [it[1]]

If you don't know columns ahead of time, use Cursor.description to build a list of column names and zip with each row to produce a list of dictionaries. Example assumes connection and query are built:

>>> cursor = connection.cursor().execute(sql)
>>> columns = [column[0] for column in cursor.description]
>>> print(columns)
['name', 'create_date']
>>> results = []
>>> for row in cursor.fetchall():
...     results.append(dict(zip(columns, row)))
...
>>> print(results)
[{'create_date': datetime.datetime(2003, 4, 8, 9, 13, 36, 390000), 'name': u'master'},
{'create_date': datetime.datetime(2013, 1, 30, 12, 31, 40, 340000), 'name': u'tempdb'},
{'create_date': datetime.datetime(2003, 4, 8, 9, 13, 36, 390000), 'name': u'model'},
{'create_date': datetime.datetime(2010, 4, 2, 17, 35, 8, 970000), 'name': u'msdb'}]

Mainly going off @Torxed response, I created a full generalised set of functions to find the schema and data into a dictionary:

def schema_dict(cursor):
cursor.execute("SELECT sys.objects.name, sys.columns.name FROM sys.objects INNER JOIN sys.columns ON sys.objects.object_id = sys.columns. object_id WHERE sys.objects.type = 'U';")
schema = {}


for it in cursor.fetchall():
if it[0] not in schema:
schema[it[0]]={'scheme':[]}
else:
schema[it[0]]['scheme'].append(it[1])


return schema




def populate_dict(cursor, schema):
for i in schema.keys():
cursor.execute("select * from {table};".format(table=i))


for row in cursor.fetchall():
colindex = 0


for col in schema[i]['scheme']:
if not 'data' in schema[i]:
schema[i]['data']=[]


schema[i]['data'].append(row[colindex])
colindex += 1


return schema


def database_to_dict():
cursor = connect()
schema = populate_dict(cursor, schema_dict(cursor))

Feel free to go all code-golf on this to reduce the lines; but in the meantime, it works!

;)

Using @Beargle's result with bottlepy, I was able to create this very concise query exposing endpoint:

@route('/api/query/<query_str>')
def query(query_str):
cursor.execute(query_str)
return {'results':
[dict(zip([column[0] for column in cursor.description], row))
for row in cursor.fetchall()]}

Here is a short form version you might be able to use

>>> cursor.select("<your SQL here>")
>>> single_row = dict(zip(zip(*cursor.description)[0], cursor.fetchone()))
>>> multiple_rows = [dict(zip(zip(*cursor.description)[0], row)) for row in cursor.fetchall()]

As you might be aware when you add * to a list you basically strips away the list, leaving the individual list entries as parameters to the function you are calling. By using zip we pick the 1st to n entry and zip them together like a the zipper in you pants.

so by using

zip(*[(a,1,2),(b,1,2)])
# interpreted by python as zip((a,1,2),(b,1,2))

you get

[('a', 'b'), (1, 1), (2, 2)]

Since description is a tuple with tuples, where each tuple describes the header and the data type for each column, you can extract the first of each tuple with

>>> columns = zip(*cursor.description)[0]

equivalent to

>>> columns = [column[0] for column in cursor.description]

What I needed, which is slightly different than what OP was asking for:
If you want to fully generalize a routine that performs SQL Select Queries, but you need to reference the results by an index number, not a name, you can do this, with a list of lists instead of a dictionary.

Each row of returned data is represented in the returned list as a list of field (column) values.
The column names can be provided as the first entry of the returned list, so parsing the returned list in the calling routine can be really easy and flexible.
In this way, the routine doing the database call doesn't need to know anything about the data that it's handling. Here is such a routine:

    def read_DB_Records(self, tablename, fieldlist, wherefield, wherevalue) -> list:


DBfile = 'C:/DATA/MyDatabase.accdb'
# this connection string is for Access 2007, 2010 or later .accdb files
conn = pyodbc.connect(r'Driver={Microsoft Access Driver (*.mdb, *.accdb)};DBQ='+DBfile)
cursor = conn.cursor()


# Build the SQL Query string using the passed-in field list:
SQL = "SELECT "
for i in range(0, len(fieldlist)):
SQL = SQL + "[" + fieldlist[i] + "]"
if i < (len(fieldlist)-1):
SQL = SQL + ", "
SQL = SQL + " FROM " + tablename


# Support an optional WHERE clause:
if wherefield != "" and wherevalue != "" :
SQL = SQL + " WHERE [" + wherefield + "] = " + "'" + wherevalue + "';"


results = []    # Create the results list object


cursor.execute(SQL) # Execute the Query


# (Optional) Get a list of the column names returned from the query:
columns = [column[0] for column in cursor.description]
results.append(columns) # append the column names to the return list


# Now add each row as a list of column data to the results list
for row in cursor.fetchall():   # iterate over the cursor
results.append(list(row))   # add the row as a list to the list of lists


cursor.close()  # close the cursor
conn.close()    # close the DB connection


return results  # return the list of lists

I like @bryan and @foo-stack answers. If you are working with postgresql and you are using psycopg2 you could use some goodies from psycopg2 to achieve the same by specifying the cursorfactory being a DictCursor when creating your cursor from the connection, like this:

cur = conn.cursor( cursor_factory=psycopg2.extras.DictCursor )

So now you can execute your sql query and you'll get a dictionary to fetch your results, without the need to map them by hand.

cur.execute( sql_query )
results = cur.fetchall()


for row in results:
print row['row_no']

Please note that you'll have to import psycopg2.extras for that to work.

For situations where the cursor is not available - for example, when the rows have been returned by some function call or inner method, you can still create a dictionary representation by using row.cursor_description

def row_to_dict(row):
return dict(zip([t[0] for t in row.cursor_description], row))

I know its old an I am just recapping what others already said. But I found this way neat, as its also injection safe.

def to_dict(row):
return dict(zip([t[0] for t in row.cursor_description], row))


def query(cursor, query, params=[], cursor_func=to_dict):
cursor.execute(query, params)
results = [cursor_func(row) for row in cursor.fetchall()]
return results


quotes = query(cursor, "select * from currency where abbreviation like ?", ["USD"])

The steps are as follows:

  1. Import Libs:
    from pandas import DataFrame
import pyodbc
import sqlalchemy
  1. Get your results from the local db:
db_file = r'xxx.accdb'
odbc_conn_str = 'DRIVER={Microsoft Access Driver (*.mdb, *.accdb)};DBQ=%s' % (db_file)


conn = pyodbc.connect(odbc_conn_str)
cur = conn.cursor()
qry = cur.execute("SELECT * FROM tbl")
columns = [column[0] for column in cur.description]


results = []
for row in cur.fetchall():
 

results.append(dict(zip(columns, row)))
df = DataFrame(results)
df