Django 多处理和数据库连接

背景:

我在做一个项目,用姜戈和 Postgres 数据库。我们还使用 mod _ wsgi 以防万一,因为我的一些网络搜索引擎已经提到了它。在 Web 表单提交时,Django 视图启动一个需要大量时间的作业(比用户希望等待的时间要长) ,因此我们通过后台的系统调用启动该作业。现在正在运行的作业需要能够对数据库进行读写操作。因为这个作业需要很长时间,所以我们使用多处理来并行运行它的一部分。

问题:

顶级脚本有一个数据库连接,当它产生子进程时,似乎父进程的连接对子进程可用。然后有一个例外,在查询之前必须调用 SET 事务隔离 LEVEL。研究表明,这是由于试图在多个进程中使用相同的数据库连接造成的。我发现一个线程建议在子进程的开始调用 connection.close () ,这样 Django 就会在需要连接时自动创建一个新连接,因此每个子进程都有一个唯一的连接——即不共享。这对我来说不起作用,因为在子进程中调用 connection.close ()会导致父进程抱怨连接丢失。

其他发现:

我读到的一些内容似乎表明您不能真正做到这一点,而且多处理、 mod _ wsgi 和 Django 不能很好地协同工作。我觉得这很难让人相信。

一些人建议使用芹菜,这可能是一个长期的解决方案,但我无法得到芹菜安装在这个时候,等待一些审批程序,所以不是一个选择目前。

在 SO 和其他地方找到了一些关于持久数据库连接的参考资料,我认为这是一个不同的问题。

还找到了一些关于 Psycopg2.pool 和 pgpool 的资料,还有一些关于保镖的资料。不可否认的是,我并不理解我在这些文章中读到的大部分内容,但是它们并没有跳出来成为我正在寻找的东西。

现时的「工作间」 :

到目前为止,我已经恢复到只是串行运行事情,它工作,但比我想要的慢。

对于如何使用多处理并行运行有什么建议吗?看起来如果父母和两个孩子都能和数据库有独立的联系,一切都会好起来的,但我似乎不能理解这种行为。

谢谢,抱歉让你久等了!

61470 次浏览

(not a great solution, but a possible workaround)

if you can't use celery, maybe you could implement your own queueing system, basically adding tasks to some task table and having a regular cron that picks them off and processes? (via a management command)

Multiprocessing copies connection objects between processes because it forks processes, and therefore copies all the file descriptors of the parent process. That being said, a connection to the SQL server is just a file, you can see it in linux under /proc//fd/.... any open file will be shared between forked processes. You can find more about forking here.

My solution was just simply close db connection just before launching processes, each process recreate connection itself when it will need one (tested in django 1.4):

from django import db
db.connections.close_all()
def db_worker():
some_paralell_code()
Process(target = db_worker,args = ())

Pgbouncer/pgpool is not connected with threads in a meaning of multiprocessing. It's rather solution for not closing connection on each request = speeding up connecting to postgres while under high load.

Update:

To completely remove problems with database connection simply move all logic connected with database to db_worker - I wanted to pass QueryDict as an argument... Better idea is simply pass list of ids... See QueryDict and values_list('id', flat=True), and do not forget to turn it to list! list(QueryDict) before passing to db_worker. Thanks to that we do not copy models database connection.

def db_worker(models_ids):
obj = PartModelWorkerClass(model_ids) # here You do Model.objects.filter(id__in = model_ids)
obj.run()




model_ids = Model.objects.all().values_list('id', flat=True)
model_ids = list(model_ids) # cast to list
process_count = 5
delta = (len(model_ids) / process_count) + 1


# do all the db stuff here ...


# here you can close db connection
from django import db
db.connections.close_all()


for it in range(0:process_count):
Process(target = db_worker,args = (model_ids[it*delta:(it+1)*delta]))

Hey I ran into this issue and was able to resolve it by performing the following (we are implementing a limited task system)

task.py

from django.db import connection


def as_task(fn):
"""  this is a decorator that handles task duties, like setting up loggers, reporting on status...etc """
connection.close()  #  this is where i kill the database connection VERY IMPORTANT
# This will force django to open a new unique connection, since on linux at least
# Connections do not fare well when forked
#...etc

ScheduledJob.py

from django.db import connection


def run_task(request, job_id):
""" Just a simple view that when hit with a specific job id kicks of said job """
# your logic goes here
# ...
processor = multiprocessing.Queue()
multiprocessing.Process(
target=call_command,  # all of our tasks are setup as management commands in django
args=[
job_info.management_command,
],
kwargs= {
'web_processor': processor,
}.items() + vars(options).items()).start()


result = processor.get(timeout=10)  # wait to get a response on a successful init
# Result is a tuple of [TRUE|FALSE,<ErrorMessage>]
if not result[0]:
raise Exception(result[1])
else:
# THE VERY VERY IMPORTANT PART HERE, notice that up to this point we haven't touched the db again, but now we absolutely have to call connection.close()
connection.close()
# we do some database accessing here to get the most recently updated job id in the database

Honestly, to prevent race conditions (with multiple simultaneous users) it would be best to call database.close() as quickly as possible after you fork the process. There may still be a chance that another user somewhere down the line totally makes a request to the db before you have a chance to flush the database though.

In all honesty it would likely be safer and smarter to have your fork not call the command directly, but instead call a script on the operating system so that the spawned task runs in its own django shell!

When using multiple databases, you should close all connections.

from django import db
for connection_name in db.connections.databases:
db.connections[connection_name].close()

EDIT

Please use the same as @lechup mentionned to close all connections(not sure since which django version this method was added):

from django import db
db.connections.close_all()

You could give more resources to Postgre, in Debian/Ubuntu you can edit :

nano /etc/postgresql/9.4/main/postgresql.conf

by replacing 9.4 by your postgre version .

Here are some useful lines that should be updated with example values to do so, names speak for themselves :

max_connections=100
shared_buffers = 3000MB
temp_buffers = 800MB
effective_io_concurrency = 300
max_worker_processes = 80

Be careful not to boost too much these parameters as it might lead to errors with Postgre trying to take more ressources than available. Examples above are running fine on a Debian 8GB Ram machine equiped with 4 cores.

For Python 3 and Django 1.9 this is what worked for me:

import multiprocessing
import django
django.setup() # Must call setup


def db_worker():
for name, info in django.db.connections.databases.items(): # Close the DB connections
django.db.connection.close()
# Execute parallel code here


if __name__ == '__main__':
multiprocessing.Process(target=db_worker)

Note that without the django.setup() I could not get this to work. I am guessing something needs to be initialized again for multiprocessing.

If all you need is I/O parallelism and not processing parallelism, you can avoid this problem by switch your processes to threads. Replace

from multiprocessing import Process

with

from threading import Thread

The Thread object has the same interface as Procsess

I had "closed connection" issues when running Django test cases sequentially. In addition to the tests, there is also another process intentionally modifying the database during test execution. This process is started in each test case setUp().

A simple fix was to inherit my test classes from TransactionTestCase instead of TestCase. This makes sure that the database was actually written, and the other process has an up-to-date view on the data.

If you're also using connection pooling, the following worked for us, forcibly closing the connections after being forked. Before did not seem to help.

from django.db import connections
from django.db.utils import DEFAULT_DB_ALIAS


connections[DEFAULT_DB_ALIAS].dispose()

Overwrite the thread class and close all DB connections at the end of the thread. Bellow code works for me:

class MyThread(Thread):
def run(self):
super().run()


connections.close_all()


def myasync(function):
def decorator(*args, **kwargs):
t = MyThread(target=function, args=args, kwargs=kwargs)
t.daemon = True
t.start()


return decorator

When you need to call a function asynchronized:

@myasync
def async_function():
...

Another way around your issue is to initialise a new connection to the database inside the forked process using:

from django.db import connection
connection.connect()

One possibility is to use multiprocessing spawn child process creation method, which will not copy django's DB connection details to the child processes. The child processes need to bootstrap from scratch, but are free to create/close their own django DB connections.

In calling code:

import multiprocessing
from myworker import work_one_item # <-- Your worker method


...


# Uses connection A
list_of_items = djago_db_call_one()


# 'spawn' starts new python processes
with multiprocessing.get_context('spawn').Pool() as pool:
# work_one_item will create own DB connection
parallel_results = pool.map(work_one_item, list_of_items)


# Continues to use connection A
another_db_call(parallel_results)

In myworker.py:

import django. # <-\
django.setup() # <-- needed if you'll make DB calls in worker


def work_one_item(item):
try:
# This will create a new DB connection
return len(MyDjangoModel.objects.all())


except Exception as ex:
return ex

Note that if you're running the calling code inside a TestCase, mocks will not be propagated to the child processes (will need to re-apply them).