如何使用 pyodbc 和 MS-Access 查看 Python cursor.execute 中真正的 SQL 查询

我使用 Python 中的以下代码(使用 pyodbc 作为 MS-Access 基础)。

cursor.execute("select a from tbl where b=? and c=?", (x, y))

没关系,但是出于维护的目的,我需要知道发送到数据库的完整和准确的 SQL 字符串。
有可能吗,怎么可能?

121261 次浏览

编写 sql 字符串,然后执行它:

sql='''select a
from tbl
where b=?
and c=? '''


cursor.execute(sql, x, y)
print 'just executed:',(sql, x,y)

现在您可以对 SQL 语句进行任何操作。

根据您使用的驱动程序,这可能是可能的,也可能是不可能的。在一些数据库中,参数(?s)被简单地替换,正如 user589983的回答所暗示的那样(尽管驱动程序将不得不做一些事情,比如在这些字符串中引用字符串和转义引号,为了得到一个可执行的语句)。

其他驱动程序将要求数据库编译(“准备”)语句,然后要求它使用给定的值执行准备好的语句。通过这种方式,使用准备好的或参数化的语句有助于避免 SQL 注入——在语句执行时,数据库“知道”您希望运行的 SQL 的哪一部分,以及在该语句中使用的值的哪一部分。

通过快速浏览 PyODBC 文档来判断,似乎不可能执行实际的 SQL,但是我可能错了。

不同的司机有所不同,下面是两个例子:

import MySQLdb
mc = MySQLdb.connect()
r = mc.cursor()
r.execute('select %s, %s', ("foo", 2))
r._executed
"select 'foo', 2"


import psycopg2
pc = psycopg2.connect()
r = pc.cursor()
r.execute('select %s, %s', ('foo', 2))
r.query
"select E'foo', 2"

答案是否定的。 我把我的问题发布在项目的主页 Google Code (和 Google Group)上,答案是:

L...@deller. id.au: cursor.mogrify 返回查询字符串 Http://code.google.com/p/pyodbc/issues/detail?id=163

这里有一个到 的 pyscopg 文档 “ mogrify”游标方法 记者提到: Http://initd.org/psycopg/docs/cursor.html#cursor.mogrify

Pyodbc 不执行任何此类操作 SQL 的翻译: 它通过 参数化 SQL 直接到 ODBC 驱动程序 所涉及的处理过程就是翻译 参数从 Python 对象到 C ODBC API 支持的类型。

SQL 上的某些转换可能是 在 ODBC 驱动程序中执行 被发送到服务器(如微软 SQLNativeClient 执行此操作) ,但这些 变化是隐藏的 毕德巴克。

因此,我认为它不是 提供一个迁移函数是可行的 在 Pyodbc。

为了调试的目的,我创建了一个检查函数,只是替换?与查询值... 这不是高科技:)但它的工作!校对: D

def check_sql_string(sql, values):
unique = "%PARAMETER%"
sql = sql.replace("?", unique)
for v in values: sql = sql.replace(unique, repr(v), 1)
return sql


query="""SELECT * FROM dbo.MA_ItemsMonthlyBalances
WHERE Item = ? AND Storage = ? AND FiscalYear = ? AND BalanceYear = ? AND Balance = ? AND BalanceMonth = ?"""
values = (1,2,"asdasd",12331, "aas)",1)


print(check_sql_string(query,values))

结果是:

选择 * FROM dbo.MA _ ItemsMonthlyBalance 其中 Item = 1 AND Storage = 2 AND FiscalYear = ‘ asdasd’AND BalanceYear = 12331 AND Balance = ‘ aas’) AND BalanceMonth = 1

有了这个,你可以记录或者做任何你想做的事情:

rowcount = self.cur.execute(query,values).rowcount
logger.info(check_sql_string(query,values))

如果需要,只需在函数中添加一些异常捕获。

之后我会检查 cursor. _ last _ execute,但是如果你想在不改变每次执行的情况下实时打印出来,可以试试这个补丁:

def log_queries(cur):
def _query(q):
print q # could also use logging
return cur._do_query(q)
cur._query = _query


conn = MySQLdb.connect( read_default_file='~/.my.cnf' )
cur = conn.cursor()
log_queries(cur)
cur.execute('SELECT %s, %s, %s', ('hello','there','world'))

它非常依赖于 MySQLdb (并且可能在以后的版本中中断)。它之所以能够工作,是因为 cur. _ query 目前只是简单地调用 calls._ do _ query 并返回其结果。

可以使用 print cursor._last_executed获取最后执行的查询。

阅读 这个的答案,你也可以使用 print cursor.mogrify(query,list)来查看执行之前或之后的完整查询。

我使用 金线鲨查看 pyodbc 中的实际 SQL 字符串。如果使用不受保护的服务器连接进行开发,可能会有所帮助。

因为 pyodbc 无法在执行查询之前查看它。您可以手动预填充查询,以了解它最终的样子。它不会作为实际的查询工作,但是它帮助我查明在需要40个以上参数的查询中是否存在任何错误。

query = """select * from [table_name] where a = ? and b = ?"""


parameter_list = ['dog', 'cat'] # List of parameters, a = 'dog', b = 'cat'.


query_list = list(query) # Split query string into individual characters.


# Loop through list and populate the question marks.
for i in range(len(parameter_list)):
for idx, val in enumerate(query_list):
if val == '?':
query_list[idx] = str(parameter_list[i])
break


# Rejoin the query string.
query_populate = ''.join(query_list)


#### Result ####
"""select * from [table_name] where a = dog and b = cat"""