Python sqlite3和并发性

我有一个使用“线程”模块的 Python 程序。每一秒钟,我的程序都会启动一个新的线程,从网络上获取一些数据,并将这些数据存储到我的硬盘上。我想使用 sqlite3来存储这些结果,但是我无法让它工作。问题似乎在于以下几点:

conn = sqlite3.connect("mydatabase.db")
  • 如果我把这行代码放在每个线程中,我会得到一个 OperationalError,告诉我数据库文件被锁定了。我猜这意味着另一个线程通过 sqlite3连接打开 mydatabase. db 并锁定它。
  • 如果我将这行代码放在主程序中,并将连接对象(con)传递给每个线程,就会得到一个 ProgrammingError,表明在线程中创建的 SQLite 对象只能在同一个线程中使用。

以前,我将所有结果存储在 CSV 文件中,没有任何这些文件锁定问题。希望这将可能与 sqlite。有什么想法吗?

105368 次浏览

You can use consumer-producer pattern. For example you can create queue that is shared between threads. First thread that fetches data from the web enqueues this data in the shared queue. Another thread that owns database connection dequeues data from the queue and passes it to the database.

I like Evgeny's answer - Queues are generally the best way to implement inter-thread communication. For completeness, here are some other options:

  • Close the DB connection when the spawned threads have finished using it. This would fix your OperationalError, but opening and closing connections like this is generally a No-No, due to performance overhead.
  • Don't use child threads. If the once-per-second task is reasonably lightweight, you could get away with doing the fetch and store, then sleeping until the right moment. This is undesirable as fetch and store operations could take >1sec, and you lose the benefit of multiplexed resources you have with a multi-threaded approach.

Switch to multiprocessing. It is much better, scales well, can go beyond the use of multiple cores by using multiple CPUs, and the interface is the same as using python threading module.

Or, as Ali suggested, just use SQLAlchemy's thread pooling mechanism. It will handle everything for you automatically and has many extra features, just to quote some of them:

  1. SQLAlchemy includes dialects for SQLite, Postgres, MySQL, Oracle, MS-SQL, Firebird, MaxDB, MS Access, Sybase and Informix; IBM has also released a DB2 driver. So you don't have to rewrite your application if you decide to move away from SQLite.
  2. The Unit Of Work system, a central part of SQLAlchemy's Object Relational Mapper (ORM), organizes pending create/insert/update/delete operations into queues and flushes them all in one batch. To accomplish this it performs a topological "dependency sort" of all modified items in the queue so as to honor foreign key constraints, and groups redundant statements together where they can sometimes be batched even further. This produces the maxiumum efficiency and transaction safety, and minimizes chances of deadlocks.

Or if you are lazy, like me, you can use SQLAlchemy. It will handle the threading for you, (using thread local, and some connection pooling) and the way it does it is even configurable.

For added bonus, if/when you realise/decide that using Sqlite for any concurrent application is going to be a disaster, you won't have to change your code to use MySQL, or Postgres, or anything else. You can just switch over.

You need to design the concurrency for your program. SQLite has clear limitations and you need to obey them, see the FAQ (also the following question).

You shouldn't be using threads at all for this. This is a trivial task for twisted and that would likely take you significantly further anyway.

Use only one thread, and have the completion of the request trigger an event to do the write.

twisted will take care of the scheduling, callbacks, etc... for you. It'll hand you the entire result as a string, or you can run it through a stream-processor (I have a twitter API and a friendfeed API that both fire off events to callers as results are still being downloaded).

Depending on what you're doing with your data, you could just dump the full result into sqlite as it's complete, cook it and dump it, or cook it while it's being read and dump it at the end.

I have a very simple application that does something close to what you're wanting on github. I call it pfetch (parallel fetch). It grabs various pages on a schedule, streams the results to a file, and optionally runs a script upon successful completion of each one. It also does some fancy stuff like conditional GETs, but still could be a good base for whatever you're doing.

Scrapy seems like a potential answer to my question. Its home page describes my exact task. (Though I'm not sure how stable the code is yet.)

I would take a look at the y_serial Python module for data persistence: http://yserial.sourceforge.net

which handles deadlock issues surrounding a single SQLite database. If demand on concurrency gets heavy one can easily set up the class Farm of many databases to diffuse the load over stochastic time.

Hope this helps your project... it should be simple enough to implement in 10 minutes.

The following found on mail.python.org.pipermail.1239789

I have found the solution. I don't know why python documentation has not a single word about this option. So we have to add a new keyword argument to connection function and we will be able to create cursors out of it in different thread. So use:

sqlite.connect(":memory:", check_same_thread = False)

works out perfectly for me. Of course from now on I need to take care of safe multithreading access to the db. Anyway thx all for trying to help.

Contrary to popular belief, newer versions of sqlite3 do support access from multiple threads.

This can be enabled via optional keyword argument check_same_thread:

sqlite.connect(":memory:", check_same_thread=False)

The most likely reason you get errors with locked databases is that you must issue

conn.commit()

after finishing a database operation. If you do not, your database will be write-locked and stay that way. The other threads that are waiting to write will time-out after a time (default is set to 5 seconds, see http://docs.python.org/2/library/sqlite3.html#sqlite3.connect for details on that).

An example of a correct and concurrent insertion would be this:

import threading, sqlite3
class InsertionThread(threading.Thread):


def __init__(self, number):
super(InsertionThread, self).__init__()
self.number = number


def run(self):
conn = sqlite3.connect('yourdb.db', timeout=5)
conn.execute('CREATE TABLE IF NOT EXISTS threadcount (threadnum, count);')
conn.commit()


for i in range(1000):
conn.execute("INSERT INTO threadcount VALUES (?, ?);", (self.number, i))
conn.commit()


# create as many of these as you wish
# but be careful to set the timeout value appropriately: thread switching in
# python takes some time
for i in range(2):
t = InsertionThread(i)
t.start()

If you like SQLite, or have other tools that work with SQLite databases, or want to replace CSV files with SQLite db files, or must do something rare like inter-platform IPC, then SQLite is a great tool and very fitting for the purpose. Don't let yourself be pressured into using a different solution if it doesn't feel right!

I could not find any benchmarks in any of the above answers so I wrote a test to benchmark everything.

I tried 3 approaches

  1. Reading and writing sequentially from the SQLite database
  2. Using a ThreadPoolExecutor to read/write
  3. Using a ProcessPoolExecutor to read/write

The results and takeaways from the benchmark are as follows

  1. Sequential reads/sequential writes work the best
  2. If you must process in parallel, use the ProcessPoolExecutor to read in parallel
  3. Do not perform any writes either using the ThreadPoolExecutor or using the ProcessPoolExecutor as you will run into database locked errors and you will have to retry inserting the chunk again

You can find the code and complete solution for the benchmarks in my SO answer HERE Hope that helps!

You need to use session.close() after every transaction to the database in order to use the same cursor in the same thread not using the same cursor in multi-threads which cause this error.