显示 Python 多处理池 imap_unorder 调用的进度?

我有一个脚本,它通过 imap_unordered()调用成功地完成了一组多处理 Pool 任务:

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
p.join() # Wait for completion

但是,我的 num_tasks大约是250,000,所以 join()锁定主线程大约10秒钟,我希望能够逐步回显到命令行,以显示主进程没有被锁定。比如:

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
remaining = rs.tasks_remaining() # How many of the map call haven't been done yet?
if (remaining == 0): break # Jump out of while loop
print("Waiting for", remaining, "tasks to complete...")
time.sleep(2)

结果对象或池本身是否有一个方法来指示剩余任务的数量?我尝试使用 multiprocessing.Value对象作为计数器(do_work在执行任务之后调用 counter.value += 1操作) ,但是计数器在停止递增之前只获得总值的约85% 。

138997 次浏览

通过进一步挖掘,我自己找到了一个答案: 看一下 imap_unordered结果对象的 __dict__,我发现它有一个 _index属性,该属性随着每个任务的完成而增加。因此,这适用于包装在 while循环中的日志记录:

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
completed = rs._index
if (completed == num_tasks): break
print "Waiting for", num_tasks-completed, "tasks to complete..."
time.sleep(2)

但是,我确实发现将 imap_unordered交换为 map_async会导致更快的执行速度,尽管结果对象有一点不同。相反,来自 map_async的 result 对象具有 _number_left属性和 ready()方法:

p = multiprocessing.Pool()
rs = p.map_async(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
if (rs.ready()): break
remaining = rs._number_left
print "Waiting for", remaining, "tasks to complete..."
time.sleep(0.5)

不需要访问结果集的私有属性:

from __future__ import division
import sys


for i, _ in enumerate(p.imap_unordered(do_work, xrange(num_tasks)), 1):
sys.stderr.write('\rdone {0:%}'.format(i/num_tasks))

I know that this is a rather old question, but here is what I'm doing when I want to track the progression of a pool of tasks in python.

from progressbar import ProgressBar, SimpleProgress
import multiprocessing as mp
from time import sleep


def my_function(letter):
sleep(2)
return letter+letter


dummy_args = ["A", "B", "C", "D"]
pool = mp.Pool(processes=2)


results = []


pbar = ProgressBar(widgets=[SimpleProgress()], maxval=len(dummy_args)).start()


r = [pool.apply_async(my_function, (x,), callback=results.append) for x in dummy_args]


while len(results) != len(dummy_args):
pbar.update(len(results))
sleep(0.5)
pbar.finish()


print results

基本上,可以在回调中使用 application _ sync (在本例中,它将返回值附加到列表中) ,这样就不必等待执行其他操作。然后,在 while 循环中检查工作的进度。在本例中,我添加了一个小部件,使其看起来更美观。

输出:

4 of 4
['AA', 'BB', 'CC', 'DD']

希望能有帮助。

我创建了一个自定义类来创建一个进度打印输出:

from multiprocessing import Pool, cpu_count




class ParallelSim(object):
def __init__(self, processes=cpu_count()):
self.pool = Pool(processes=processes)
self.total_processes = 0
self.completed_processes = 0
self.results = []


def add(self, func, args):
self.pool.apply_async(func=func, args=args, callback=self.complete)
self.total_processes += 1


def complete(self, result):
self.results.extend(result)
self.completed_processes += 1
print('Progress: {:.2f}%'.format((self.completed_processes/self.total_processes)*100))


def run(self):
self.pool.close()
self.pool.join()


def get_results(self):
return self.results

我个人最喜欢的——当事情并行运行和提交时,提供一个不错的小进度条和完成时间。

from multiprocessing import Pool
import tqdm


pool = Pool(processes=8)
for _ in tqdm.tqdm(pool.imap_unordered(do_work, tasks), total=len(tasks)):
pass

我发现工作已经完成的时候,我试图检查它的进度。这就是使用 TQDM对我有效的方法。

pip install tqdm

from multiprocessing import Pool
from tqdm import tqdm


tasks = range(5)
pool = Pool()
pbar = tqdm(total=len(tasks))


def do_work(x):
# do something with x
pbar.update(1)


pool.imap_unordered(do_work, tasks)
pool.close()
pool.join()
pbar.close()

这应该适用于所有类型的多处理,无论它们是否阻塞。

尝试这种简单的基于 Queue 的方法,它也可以用于池。请注意,在进度条启动之后打印任何内容都会导致它被移动,至少对于这个特定的进度条是如此。(PyPI 的进展1.5)

import time
from progress.bar import Bar


def status_bar( queue_stat, n_groups, n ):


bar = Bar('progress', max = n)


finished = 0
while finished < n_groups:


while queue_stat.empty():
time.sleep(0.01)


gotten = queue_stat.get()
if gotten == 'finished':
finished += 1
else:
bar.next()
bar.finish()




def process_data( queue_data, queue_stat, group):


for i in group:


... do stuff resulting in new_data


queue_stat.put(1)


queue_stat.put('finished')
queue_data.put(new_data)


def multiprocess():


new_data = []


groups = [[1,2,3],[4,5,6],[7,8,9]]
combined = sum(groups,[])


queue_data = multiprocessing.Queue()
queue_stat = multiprocessing.Queue()


for i, group in enumerate(groups):


if i == 0:


p = multiprocessing.Process(target = status_bar,
args=(queue_stat,len(groups),len(combined)))
processes.append(p)
p.start()


p = multiprocessing.Process(target = process_data,
args=(queue_data, queue_stat, group))
processes.append(p)
p.start()


for i in range(len(groups)):
data = queue_data.get()
new_data += data


for p in processes:
p.join()

正如 Tim 所建议的,您可以使用 tqdmimap来解决这个问题。我只是偶然发现了这个问题,并调整了 imap_unordered解决方案,以便能够访问映射的结果。它是这样运作的:

from multiprocessing import Pool
import tqdm


pool = multiprocessing.Pool(processes=4)
mapped_values = list(tqdm.tqdm(pool.imap_unordered(do_work, range(num_tasks)), total=len(values)))

如果您不关心作业返回的值,则不需要将列表分配给任何变量。

Pool.apply_async()的一个简单解决方案:

from multiprocessing import Pool
from tqdm import tqdm
from time import sleep




def work(x):
sleep(0.2)
return x**2




n = 10


with Pool(4) as p, tqdm(total=n) as pbar:
res = [p.apply_async(
work, args=(i,), callback=lambda _: pbar.update(1)) for i in range(n)]
results = [r.get() for r in res]

在做了一些研究之后,我编写了一个名为 平行线的小模块。它允许您分别显示池的总体进度和每个核心的进度。 它易于使用,并有一个很好的描述。

例如:

from parallelbar import progress_map
from parallelbar.tools import cpu_bench




if __name__=='__main__':
# create list of task
tasks = [1_000_000 + i for i in range(100)]
progress_map(cpu_bench, tasks)

有些答案与进度条一起工作,但是我不能从池中得到结果

我使用 TQDM创建进度条 你可以通过 pip install tqdm安装它

下面的简单代码可以很好地使用进度条,你也可以得到结果:

from multiprocessing import Pool
from tqdm import tqdm
from time import sleep


tasks = range(5)
result = []


def do_work(x):
# do something with x and return the result
sleep(2)
return x + 2


if __name__ == '__main__':
pbar = tqdm(total=len(tasks))


with Pool(2) as p:
for i in p.imap_unordered(do_work, tasks):


result.append(i)
pbar.update(i)
    

pbar.close()
print(result)

快速启动

使用 tqdmmultiprocessing.Pool

安装

pip install tqdm

例子

import time
import threading
from multiprocessing import Pool


from tqdm import tqdm




def do_work(x):
time.sleep(x)
return x




def progress():
time.sleep(3)  # Check progress after 3 seconds
print(f'total: {pbar.total} finish:{pbar.n}')




tasks = range(10)
pbar = tqdm(total=len(tasks))


if __name__ == '__main__':
thread = threading.Thread(target=progress)
thread.start()
results = []
with Pool(processes=5) as pool:
for result in pool.imap_unordered(do_work, tasks):
results.append(result)
pbar.update(1)
print(results)

结果




酒瓶

安装

pip install flask

Main Py

import time
from multiprocessing import Pool


from tqdm import tqdm
from flask import Flask, make_response, jsonify


app = Flask(__name__)




def do_work(x):
time.sleep(x)
return x




total = 5  # num of tasks
tasks = range(total)
pbar = tqdm(total=len(tasks))




@app.route('/run/')
def run():
results = []
with Pool(processes=2) as pool:
for _result in pool.imap_unordered(do_work, tasks):
results.append(_result)
if pbar.n >= total:
pbar.n = 0  # reset
pbar.update(1)
response = make_response(jsonify(dict(results=results)))
response.headers.add('Access-Control-Allow-Origin', '*')
response.headers.add('Access-Control-Allow-Headers', '*')
response.headers.add('Access-Control-Allow-Methods', '*')
return response




@app.route('/progress/')
def progress():
response = make_response(jsonify(dict(n=pbar.n, total=pbar.total)))
response.headers.add('Access-Control-Allow-Origin', '*')
response.headers.add('Access-Control-Allow-Headers', '*')
response.headers.add('Access-Control-Allow-Methods', '*')
return response

运行(例如,在 Windows 中)

set FLASK_APP=main
flask run

空气污染指数列表

Test.html

<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Progress Bar</title>
<script src="https://cdn.bootcss.com/jquery/3.0.0/jquery.min.js"></script>
<script src="https://cdn.bootcdn.net/ajax/libs/twitter-bootstrap/3.3.7/js/bootstrap.min.js"></script>
<link href="https://cdn.bootcdn.net/ajax/libs/twitter-bootstrap/3.3.7/css/bootstrap.min.css" rel="stylesheet">
</head>
<body>
<button id="run">Run the task</button>
<br><br>
<div class="progress">
<div class="progress-bar" role="progressbar" aria-valuenow="1" aria-valuemin="0" aria-valuemax="100"
style="width: 10%">0.00%
</div>
</div>
</body>
<script>
function set_progress_rate(n, total) {
//Set the rate of progress bar
var rate = (n / total * 100).toFixed(2);
if (n > 0) {
$(".progress-bar").attr("aria-valuenow", n);
$(".progress-bar").attr("aria-valuemax", total);
$(".progress-bar").text(rate + "%");
$(".progress-bar").css("width", rate + "%");
}
}


$("#run").click(function () {
//Run the task
$.ajax({
url: "http://127.0.0.1:5000/run/",
type: "GET",
success: function (response) {
set_progress_rate(100, 100);
console.log('Results:' + response['results']);
}
});
});
setInterval(function () {
//Show progress every 1 second
$.ajax({
url: "http://127.0.0.1:5000/progress/",
type: "GET",
success: function (response) {
console.log(response);
var n = response["n"];
var total = response["total"];
set_progress_rate(n, total);
}
});
}, 1000);
</script>
</html>

结果