使用多处理队列、池和锁的简单例子

我尝试阅读 http://docs.python.org/dev/library/multiprocessing.html的文档,但是我仍然在与多处理队列、池和锁作斗争。现在我可以构建下面的例子。

关于 Queue 和 Pool,我不确定我是否正确理解了这个概念,所以如果我错了请纠正我。我想做的是 一次处理2个请求(本例中的数据列表有8个) ,那么,我应该使用什么?池创建2个进程,可以处理两个不同的队列(最大为2个) ,还是应该只使用 Queue 每次处理2个输入?锁定将是正确地打印输出。

import multiprocessing
import time


data = (['a', '2'], ['b', '4'], ['c', '6'], ['d', '8'],
['e', '1'], ['f', '3'], ['g', '5'], ['h', '7']
)




def mp_handler(var1):
for indata in var1:
p = multiprocessing.Process(target=mp_worker, args=(indata[0], indata[1]))
p.start()




def mp_worker(inputs, the_time):
print " Processs %s\tWaiting %s seconds" % (inputs, the_time)
time.sleep(int(the_time))
print " Process %s\tDONE" % inputs


if __name__ == '__main__':
mp_handler(data)
215419 次浏览

Here is an example from my code (for threaded pool, but just change class name and you'll have process pool):

def execute_run(rp):
... do something


pool = ThreadPoolExecutor(6)
for mat in TESTED_MATERIAL:
for en in TESTED_ENERGIES:
for ecut in TESTED_E_CUT:
rp = RunParams(
simulations, DEST_DIR,
PARTICLE, mat, 960, 0.125, ecut, en
)
pool.submit(execute_run, rp)
pool.join()

Basically:

  • pool = ThreadPoolExecutor(6) creates a pool for 6 threads
  • Then you have bunch of for's that add tasks to the pool
  • pool.submit(execute_run, rp) adds a task to pool, first arogument is a function called in in a thread/process, rest of the arguments are passed to the called function.
  • pool.join waits until all tasks are done.

The best solution for your problem is to utilize a Pool. Using Queues and having a separate "queue feeding" functionality is probably overkill.

Here's a slightly rearranged version of your program, this time with only 2 processes coralled in a Pool. I believe it's the easiest way to go, with minimal changes to original code:

import multiprocessing
import time


data = (
['a', '2'], ['b', '4'], ['c', '6'], ['d', '8'],
['e', '1'], ['f', '3'], ['g', '5'], ['h', '7']
)


def mp_worker((inputs, the_time)):
print " Processs %s\tWaiting %s seconds" % (inputs, the_time)
time.sleep(int(the_time))
print " Process %s\tDONE" % inputs


def mp_handler():
p = multiprocessing.Pool(2)
p.map(mp_worker, data)


if __name__ == '__main__':
mp_handler()

Note that mp_worker() function now accepts a single argument (a tuple of the two previous arguments) because the map() function chunks up your input data into sublists, each sublist given as a single argument to your worker function.

Output:

Processs a  Waiting 2 seconds
Processs b  Waiting 4 seconds
Process a   DONE
Processs c  Waiting 6 seconds
Process b   DONE
Processs d  Waiting 8 seconds
Process c   DONE
Processs e  Waiting 1 seconds
Process e   DONE
Processs f  Waiting 3 seconds
Process d   DONE
Processs g  Waiting 5 seconds
Process f   DONE
Processs h  Waiting 7 seconds
Process g   DONE
Process h   DONE

Edit as per @Thales comment below:

If you want "a lock for each pool limit" so that your processes run in tandem pairs, ala:

A waiting B waiting | A done , B done | C waiting , D waiting | C done, D done | ...

then change the handler function to launch pools (of 2 processes) for each pair of data:

def mp_handler():
subdata = zip(data[0::2], data[1::2])
for task1, task2 in subdata:
p = multiprocessing.Pool(2)
p.map(mp_worker, (task1, task2))

Now your output is:

 Processs a Waiting 2 seconds
Processs b Waiting 4 seconds
Process a  DONE
Process b  DONE
Processs c Waiting 6 seconds
Processs d Waiting 8 seconds
Process c  DONE
Process d  DONE
Processs e Waiting 1 seconds
Processs f Waiting 3 seconds
Process e  DONE
Process f  DONE
Processs g Waiting 5 seconds
Processs h Waiting 7 seconds
Process g  DONE
Process h  DONE

This might be not 100% related to the question, but on my search for an example of using multiprocessing with a queue this shows up first on google.

This is a basic example class that you can instantiate and put items in a queue and can wait until queue is finished. That's all I needed.

from multiprocessing import JoinableQueue
from multiprocessing.context import Process




class Renderer:
queue = None


def __init__(self, nb_workers=2):
self.queue = JoinableQueue()
self.processes = [Process(target=self.upload) for i in range(nb_workers)]
for p in self.processes:
p.start()


def render(self, item):
self.queue.put(item)


def upload(self):
while True:
item = self.queue.get()
if item is None:
break


# process your item here


self.queue.task_done()


def terminate(self):
""" wait until queue is empty and terminate processes """
self.queue.join()
for p in self.processes:
p.terminate()


r = Renderer()
r.render(item1)
r.render(item2)
r.terminate()

Here is my personal goto for this topic:

Gist here, (pull requests welcome!): https://gist.github.com/thorsummoner/b5b1dfcff7e7fdd334ec

import multiprocessing
import sys


THREADS = 3


# Used to prevent multiple threads from mixing thier output
GLOBALLOCK = multiprocessing.Lock()




def func_worker(args):
"""This function will be called by each thread.
This function can not be a class method.
"""
# Expand list of args into named args.
str1, str2 = args
del args


# Work
# ...






# Serial-only Portion
GLOBALLOCK.acquire()
print(str1)
print(str2)
GLOBALLOCK.release()




def main(argp=None):
"""Multiprocessing Spawn Example
"""
# Create the number of threads you want
pool = multiprocessing.Pool(THREADS)


# Define two jobs, each with two args.
func_args = [
('Hello', 'World',),
('Goodbye', 'World',),
]




try:
pool.map_async(func_worker, func_args).get()
except KeyboardInterrupt:
# Allow ^C to interrupt from any thread.
sys.stdout.write('\033[0m')
sys.stdout.write('User Interupt\n')
pool.close()


if __name__ == '__main__':
main()

For everyone using editors like Komodo Edit (win10) add sys.stdout.flush() to:

def mp_worker((inputs, the_time)):
print " Process %s\tWaiting %s seconds" % (inputs, the_time)
time.sleep(int(the_time))
print " Process %s\tDONE" % inputs
sys.stdout.flush()

or as first line to:

    if __name__ == '__main__':
sys.stdout.flush()

This helps to see what goes on during the run of the script; in stead of having to look at the black command line box.