如何获得一个函数的返回值传递给multiprocessing.Process?

在下面的示例代码中,我想获取函数worker的返回值。我该怎么做呢?这个值存储在哪里?

示例代码:

import multiprocessing


def worker(procnum):
'''worker function'''
print str(procnum) + ' represent!'
return procnum




if __name__ == '__main__':
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i,))
jobs.append(p)
p.start()


for proc in jobs:
proc.join()
print jobs

输出:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process-3, stopped)>, <Process(Process-4, stopped)>, <Process(Process-5, stopped)>]

我似乎找不到存储在jobs中的对象的相关属性。

337971 次浏览

使用共享变量进行通信。比如这样:

import multiprocessing




def worker(procnum, return_dict):
"""worker function"""
print(str(procnum) + " represent!")
return_dict[procnum] = procnum




if __name__ == "__main__":
manager = multiprocessing.Manager()
return_dict = manager.dict()
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i, return_dict))
jobs.append(p)
p.start()


for proc in jobs:
proc.join()
print(return_dict.values())

我认为@sega_sai建议的方法更好。但它确实需要一个代码示例,所以如下:

import multiprocessing
from os import getpid


def worker(procnum):
print('I am number %d in process %d' % (procnum, getpid()))
return getpid()


if __name__ == '__main__':
pool = multiprocessing.Pool(processes = 3)
print(pool.map(worker, range(5)))

它将打印返回值:

I am number 0 in process 19139
I am number 1 in process 19138
I am number 2 in process 19140
I am number 3 in process 19139
I am number 4 in process 19140
[19139, 19138, 19140, 19139, 19140]

如果你熟悉map (Python 2内置),这应该不是太大的挑战。否则,看看sega_Sai的链接

注意,只需要很少的代码。(还要注意如何重用流程)。

你可以使用内置的exit来设置进程的退出码。它可以从进程的exitcode属性中获得:

import multiprocessing


def worker(procnum):
print str(procnum) + ' represent!'
exit(procnum)


if __name__ == '__main__':
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i,))
jobs.append(p)
p.start()


result = []
for proc in jobs:
proc.join()
result.append(proc.exitcode)
print result

输出:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[0, 1, 2, 3, 4]

对于正在寻找如何使用QueueProcess中获取值的任何人:

import multiprocessing


ret = {'foo': False}


def worker(queue):
ret = queue.get()
ret['foo'] = True
queue.put(ret)


if __name__ == '__main__':
queue = multiprocessing.Queue()
queue.put(ret)
p = multiprocessing.Process(target=worker, args=(queue,))
p.start()
p.join()
print(queue.get())  # Prints {"foo": True}

注意,在Windows或Jupyter Notebook中,使用multithreading你必须将其保存为文件并执行该文件。如果你在命令提示符中这样做,你会看到这样的错误:

 AttributeError: Can't get attribute 'worker' on <module '__main__' (built-in)>

这个例子展示了如何使用多处理。管实例列表从任意数量的进程中返回字符串:

import multiprocessing


def worker(procnum, send_end):
'''worker function'''
result = str(procnum) + ' represent!'
print result
send_end.send(result)


def main():
jobs = []
pipe_list = []
for i in range(5):
recv_end, send_end = multiprocessing.Pipe(False)
p = multiprocessing.Process(target=worker, args=(i, send_end))
jobs.append(p)
pipe_list.append(recv_end)
p.start()


for proc in jobs:
proc.join()
result_list = [x.recv() for x in pipe_list]
print result_list


if __name__ == '__main__':
main()

输出:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
['0 represent!', '1 represent!', '2 represent!', '3 represent!', '4 represent!']

此解决方案使用的资源比多处理。队列使用的资源少

  • 一个管道
  • 至少一个锁
  • 一个缓冲区
  • 一个线程

多处理。SimpleQueue,它使用

  • 一个管道
  • 至少一个锁

查看这些类型的源代码是非常有指导意义的。

我修改了vartec的答案,因为我需要从函数中获得错误代码。(由于vertec ! !这是一个很棒的技巧)

这也可以用manager.list来完成,但我认为最好是将它保存在字典中,并在其中存储一个列表。这样,我们就可以保留函数和结果,因为我们不能确定列表将被填充的顺序。

from multiprocessing import Process
import time
import datetime
import multiprocessing




def func1(fn, m_list):
print 'func1: starting'
time.sleep(1)
m_list[fn] = "this is the first function"
print 'func1: finishing'
# return "func1"  # no need for return since Multiprocess doesnt return it =(


def func2(fn, m_list):
print 'func2: starting'
time.sleep(3)
m_list[fn] = "this is function 2"
print 'func2: finishing'
# return "func2"


def func3(fn, m_list):
print 'func3: starting'
time.sleep(9)
# if fail wont join the rest because it never populate the dict
# or do a try/except to get something in return.
raise ValueError("failed here")
# if we want to get the error in the manager dict we can catch the error
try:
raise ValueError("failed here")
m_list[fn] = "this is third"
except:
m_list[fn] = "this is third and it fail horrible"
# print 'func3: finishing'
# return "func3"




def runInParallel(*fns):  # * is to accept any input in list
start_time = datetime.datetime.now()
proc = []
manager = multiprocessing.Manager()
m_list = manager.dict()
for fn in fns:
# print fn
# print dir(fn)
p = Process(target=fn, name=fn.func_name, args=(fn, m_list))
p.start()
proc.append(p)
for p in proc:
p.join()  # 5 is the time out


print datetime.datetime.now() - start_time
return m_list, proc


if __name__ == '__main__':
manager, proc = runInParallel(func1, func2, func3)
# print dir(proc[0])
# print proc[0]._name
# print proc[0].name
# print proc[0].exitcode


# here you can check what did fail
for i in proc:
print i.name, i.exitcode  # name was set up in the Process line 53


# here will only show the function that worked and where able to populate the
# manager dict
for i, j in manager.items():
print dir(i)  # things you can do to the function
print i, j

出于某种原因,我在任何地方都找不到如何使用Queue实现这一点的一般示例(即使Python的文档示例也不会生成多个进程),所以在尝试了大约10次后,我得到了以下结果:

def add_helper(queue, arg1, arg2): # the func called in child processes
ret = arg1 + arg2
queue.put(ret)


def multi_add(): # spawns child processes
q = Queue()
processes = []
rets = []
for _ in range(0, 100):
p = Process(target=add_helper, args=(q, 1, 2))
processes.append(p)
p.start()
for p in processes:
ret = q.get() # will block
rets.append(ret)
for p in processes:
p.join()
return rets

Queue是一个阻塞的、线程安全的队列,你可以用它来存储子进程的返回值。因此,您必须将队列传递给每个进程。这里不太明显的一点是,你必须在join Processes之前从队列中get(),否则队列就会填充并阻塞所有内容。

更新用于那些面向对象的对象(在Python 3.4中测试):

from multiprocessing import Process, Queue


class Multiprocessor():


def __init__(self):
self.processes = []
self.queue = Queue()


@staticmethod
def _wrapper(func, queue, args, kwargs):
ret = func(*args, **kwargs)
queue.put(ret)


def run(self, func, *args, **kwargs):
args2 = [func, self.queue, args, kwargs]
p = Process(target=self._wrapper, args=args2)
self.processes.append(p)
p.start()


def wait(self):
rets = []
for p in self.processes:
ret = self.queue.get()
rets.append(ret)
for p in self.processes:
p.join()
return rets


# tester
if __name__ == "__main__":
mp = Multiprocessor()
num_proc = 64
for _ in range(num_proc): # queue up multiple tasks running `sum`
mp.run(sum, [1, 2, 3, 4, 5])
ret = mp.wait() # get all results
print(ret)
assert len(ret) == num_proc and all(r == 15 for r in ret)

一个简单的解决方案:

import multiprocessing


output=[]
data = range(0,10)


def f(x):
return x**2


def handler():
p = multiprocessing.Pool(64)
r=p.map(f, data)
return r


if __name__ == '__main__':
output.append(handler())


print(output[0])

输出:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

卵石包有一个很好的抽象,利用了multiprocessing.Pipe,这使得它非常简单:

from pebble import concurrent


@concurrent.process
def function(arg, kwarg=0):
return arg + kwarg


future = function(1, kwarg=1)


print(future.result())

示例来自:https://pythonhosted.org/Pebble/#concurrent-decorators

如果你正在使用Python 3,你可以使用concurrent.futures.ProcessPoolExecutor作为一个方便的抽象:

from concurrent.futures import ProcessPoolExecutor


def worker(procnum):
'''worker function'''
print(str(procnum) + ' represent!')
return procnum




if __name__ == '__main__':
with ProcessPoolExecutor() as executor:
print(list(executor.map(worker, range(5))))

输出:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[0, 1, 2, 3, 4]

我想我应该简化上面复制的最简单的例子,在Py3.6上为我工作。最简单的是multiprocessing.Pool:

import multiprocessing
import time


def worker(x):
time.sleep(1)
return x


pool = multiprocessing.Pool()
print(pool.map(worker, range(10)))

你可以用Pool(processes=5)来设置池中进程的数量。但是,它默认为CPU计数,因此对于CPU受限的任务,请将其保留为空。(I/ o绑定任务通常适合线程,因为线程大部分都在等待,所以可以共享一个CPU核心。)

(注意,工作方法不能嵌套在方法中。我最初在调用pool.map的方法中定义了我的工作者方法,以保持它完全自包含,但随后进程无法导入它,并抛出“AttributeError: Can't pickle local object outer_method..inner_method”。更多的在这里。它可以在类内部。)

(感谢最初的问题指定打印'represent!'而不是time.sleep(),但如果没有它,我认为一些代码是并发运行的。)


Py3的ProcessPoolExecutor也是两行(.map返回一个生成器,所以你需要list()):

from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor() as executor:
print(list(executor.map(worker, range(10))))

使用普通的Processes:

import multiprocessing
import time


def worker(x, queue):
time.sleep(1)
queue.put(x)


queue = multiprocessing.SimpleQueue()
tasks = range(10)


for task in tasks:
multiprocessing.Process(target=worker, args=(task, queue,)).start()


for _ in tasks:
print(queue.get())

如果你只需要putget,请使用SimpleQueue。第一个循环启动所有进程,然后第二个循环进行阻塞queue.get调用。我不认为有任何理由调用p.join()

你可以使用< >强ProcessPoolExecutor < / >强从函数中获取返回值,如下所示:

from concurrent.futures import ProcessPoolExecutor


def test(num1, num2):
return num1 + num2


with ProcessPoolExecutor() as executor:
feature = executor.submit(test, 2, 3)
print(feature.result()) # 5