使用多处理。同时处理的最大数量的处理

我有 Python密码:

from multiprocessing import Process


def f(name):
print 'hello', name


if __name__ == '__main__':
for i in range(0, MAX_PROCESSES):
p = Process(target=f, args=(i,))
p.start()

运行良好。但是,MAX_PROCESSES是可变的,可以是 1512之间的任何值。因为我只在一台有 8核心的机器上运行这段代码,所以我需要找出是否可以限制允许同时运行的进程数量。我已经查看了 multiprocessing.Queue,但它看起来不像我需要的-或者可能我解释的文件不正确。

有没有办法限制同时运行的 multiprocessing.Process的数量?

151402 次浏览

It might be most sensible to use multiprocessing.Pool which produces a pool of worker processes based on the max number of cores available on your system, and then basically feeds tasks in as the cores become available.

The example from the standard docs (http://docs.python.org/2/library/multiprocessing.html#using-a-pool-of-workers) shows that you can also manually set the number of cores:

from multiprocessing import Pool


def f(x):
return x*x


if __name__ == '__main__':
pool = Pool(processes=4)              # start 4 worker processes
result = pool.apply_async(f, [10])    # evaluate "f(10)" asynchronously
print result.get(timeout=1)           # prints "100" unless your computer is *very* slow
print pool.map(f, range(10))          # prints "[0, 1, 4,..., 81]"

And it's also handy to know that there is the multiprocessing.cpu_count() method to count the number of cores on a given system, if needed in your code.

Edit: Here's some draft code that seems to work for your specific case:

import multiprocessing


def f(name):
print 'hello', name


if __name__ == '__main__':
pool = multiprocessing.Pool() #use all available cores, otherwise specify the number you want as an argument
for i in xrange(0, 512):
pool.apply_async(f, args=(i,))
pool.close()
pool.join()

more generally, this could also look like this:

import multiprocessing
def chunks(l, n):
for i in range(0, len(l), n):
yield l[i:i + n]


numberOfThreads = 4




if __name__ == '__main__':
jobs = []
for i, param in enumerate(params):
p = multiprocessing.Process(target=f, args=(i,param))
jobs.append(p)
for i in chunks(jobs,numberOfThreads):
for j in i:
j.start()
for j in i:
j.join()

Of course, that way is quite cruel (since it waits for every process in a junk until it continues with the next chunk). Still it works well for approx equal run times of the function calls.

I think Semaphore is what you are looking for, it will block the main process after counting down to 0. Sample code:

from multiprocessing import Process
from multiprocessing import Semaphore
import time


def f(name, sema):
print('process {} starting doing business'.format(name))
# simulate a time-consuming task by sleeping
time.sleep(5)
# `release` will add 1 to `sema`, allowing other
# processes blocked on it to continue
sema.release()


if __name__ == '__main__':
concurrency = 20
total_task_num = 1000
sema = Semaphore(concurrency)
all_processes = []
for i in range(total_task_num):
# once 20 processes are running, the following `acquire` call
# will block the main process since `sema` has been reduced
# to 0. This loop will continue only after one or more
# previously created processes complete.
sema.acquire()
p = Process(target=f, args=(i, sema))
all_processes.append(p)
p.start()


# inside main process, wait for all processes to finish
for p in all_processes:
p.join()

The following code is more structured since it acquires and releases sema in the same function. However, it will consume too much resources if total_task_num is very large:

from multiprocessing import Process
from multiprocessing import Semaphore
import time


def f(name, sema):
print('process {} starting doing business'.format(name))
# `sema` is acquired and released in the same
# block of code here, making code more readable,
# but may lead to problem.
sema.acquire()
time.sleep(5)
sema.release()


if __name__ == '__main__':
concurrency = 20
total_task_num = 1000
sema = Semaphore(concurrency)
all_processes = []
for i in range(total_task_num):
p = Process(target=f, args=(i, sema))
all_processes.append(p)
# the following line won't block after 20 processes
# have been created and running, instead it will carry
# on until all 1000 processes are created.
p.start()


# inside main process, wait for all processes to finish
for p in all_processes:
p.join()

The above code will create total_task_num processes but only concurrency processes will be running while other processes are blocked, consuming precious system resources.