多处理: 使用 tqdm 显示进度条

为了使我的代码更“ Python 化”和更快,我使用 multiprocessing和 map 函数来发送 a)函数和 b)迭代的范围。

植入的溶液(即直接在范围 tqdm.tqdm(range(0, 30))上调用 tqdm)不适用于多处理(如下面的代码所示)。

进度条从0到100% 显示(当 python 读取代码时?)但它并不表示映射函数的实际进度。

怎样才能显示一个进度条,指出在哪一步的“地图”功能?

from multiprocessing import Pool
import tqdm
import time


def _foo(my_number):
square = my_number * my_number
time.sleep(1)
return square


if __name__ == '__main__':
p = Pool(2)
r = p.map(_foo, tqdm.tqdm(range(0, 30)))
p.close()
p.join()

欢迎提供任何帮助和建议。

181061 次浏览

找到解决方案。小心点!由于多处理,估计时间(每个循环迭代,总时间等)可能是不稳定的,但进度条工作得很好。

注意: Pool的上下文管理器只能在 Python 3.3 + 中使用。

from multiprocessing import Pool
import time
from tqdm import *


def _foo(my_number):
square = my_number * my_number
time.sleep(1)
return square


if __name__ == '__main__':
with Pool(processes=2) as p:
max_ = 30
with tqdm(total=max_) as pbar:
for _ in p.imap_unordered(_foo, range(0, max_)):
pbar.update()

使用 imap而不是 map,后者返回处理值的迭代器。

from multiprocessing import Pool
import tqdm
import time


def _foo(my_number):
square = my_number * my_number
time.sleep(1)
return square


if __name__ == '__main__':
with Pool(2) as p:
r = list(tqdm.tqdm(p.imap(_foo, range(30)), total=30))

基于哈维 · 马丁内斯的答案,我写了函数 imap_unordered_bar。它可以用与 imap_unordered相同的方式使用,唯一的区别是显示了一个处理条。

from multiprocessing import Pool
import time
from tqdm import *


def imap_unordered_bar(func, args, n_processes = 2):
p = Pool(n_processes)
res_list = []
with tqdm(total = len(args)) as pbar:
for i, res in tqdm(enumerate(p.imap_unordered(func, args))):
pbar.update()
res_list.append(res)
pbar.close()
p.close()
p.join()
return res_list


def _foo(my_number):
square = my_number * my_number
time.sleep(1)
return square


if __name__ == '__main__':
result = imap_unordered_bar(_foo, range(5))

您可以改用 p_tqdm

Https://github.com/swansonk14/p_tqdm

from p_tqdm import p_map
import time


def _foo(my_number):
square = my_number * my_number
time.sleep(1)
return square


if __name__ == '__main__':
r = p_map(_foo, list(range(0, 30)))

这种方法很简单,而且很有效。

from multiprocessing.pool import ThreadPool
import time
from tqdm import tqdm


def job():
time.sleep(1)
pbar.update()


pool = ThreadPool(5)
with tqdm(total=100) as pbar:
for i in range(100):
pool.apply_async(job)
pool.close()
pool.join()
import multiprocessing as mp
import tqdm




iterable = ...
num_cpu = mp.cpu_count() - 2 # dont use all cpus.




def func():
# your logic
...




if __name__ == '__main__':
with mp.Pool(num_cpu) as p:
list(tqdm.tqdm(p.imap(func, iterable), total=len(iterable)))

对不起,我迟到了,但是如果您需要的只是一个并发映射,我在 tqdm>=4.42.0中添加了这个功能:

from tqdm.contrib.concurrent import process_map  # or thread_map
import time


def _foo(my_number):
square = my_number * my_number
time.sleep(1)
return square


if __name__ == '__main__':
r = process_map(_foo, range(0, 30), max_workers=2)

参考文献: https://tqdm.github.io/docs/contrib.concurrent/https://github.com/tqdm/tqdm/blob/master/examples/parallel_bars.py

它支持 max_workerschunksize,你也可以很容易地从 process_map切换到 thread_map

下面是我对需要从并行执行函数返回结果的看法。这个函数做了一些事情(我的另一篇文章进一步解释了它) ,但关键是有一个任务等待队列和一个任务完成队列。当工作线程完成挂起队列中的每个任务时,它们将结果添加到任务完成队列中。您可以使用 tqdm 进度条将检查包装到任务完成队列。我没有把 do _ work ()函数的实现放在这里,因为这里的消息是监视任务完成队列,并在每次有结果时更新进度条。

def par_proc(job_list, num_cpus=None, verbose=False):


# Get the number of cores
if not num_cpus:
num_cpus = psutil.cpu_count(logical=False)


print('* Parallel processing')
print('* Running on {} cores'.format(num_cpus))


# Set-up the queues for sending and receiving data to/from the workers
tasks_pending = mp.Queue()
tasks_completed = mp.Queue()


# Gather processes and results here
processes = []
results = []


# Count tasks
num_tasks = 0


# Add the tasks to the queue
for job in job_list:
for task in job['tasks']:
expanded_job = {}
num_tasks = num_tasks + 1
expanded_job.update({'func': pickle.dumps(job['func'])})
expanded_job.update({'task': task})
tasks_pending.put(expanded_job)


# Set the number of workers here
num_workers = min(num_cpus, num_tasks)


# We need as many sentinels as there are worker processes so that ALL processes exit when there is no more
# work left to be done.
for c in range(num_workers):
tasks_pending.put(SENTINEL)


print('* Number of tasks: {}'.format(num_tasks))


# Set-up and start the workers
for c in range(num_workers):
p = mp.Process(target=do_work, args=(tasks_pending, tasks_completed, verbose))
p.name = 'worker' + str(c)
processes.append(p)
p.start()


# Gather the results
completed_tasks_counter = 0


with tqdm(total=num_tasks) as bar:
while completed_tasks_counter < num_tasks:
results.append(tasks_completed.get())
completed_tasks_counter = completed_tasks_counter + 1
bar.update(completed_tasks_counter)


for p in processes:
p.join()


return results

对于应用程序异步的进度条,我们可以使用以下代码,如下所示:

Https://github.com/tqdm/tqdm/issues/484

import time
import random
from multiprocessing import Pool
from tqdm import tqdm


def myfunc(a):
time.sleep(random.random())
return a ** 2


pool = Pool(2)
pbar = tqdm(total=100)


def update(*a):
pbar.update()


for i in range(pbar.total):
pool.apply_async(myfunc, args=(i,), callback=update)
pool.close()
pool.join()

基于“ user17242583”的回答,我创建了以下函数。它应该和 Pool.map 一样快,并且结果总是有序的。另外,您可以向函数传递任意多的参数,而不仅仅是一个可迭代的参数。

from multiprocessing import Pool
from functools import partial
from tqdm import tqdm




def imap_tqdm(function, iterable, processes, chunksize=1, desc=None, disable=False, **kwargs):
"""
Run a function in parallel with a tqdm progress bar and an arbitrary number of arguments.
Results are always ordered and the performance should be the same as of Pool.map.
:param function: The function that should be parallelized.
:param iterable: The iterable passed to the function.
:param processes: The number of processes used for the parallelization.
:param chunksize: The iterable is based on the chunk size chopped into chunks and submitted to the process pool as separate tasks.
:param desc: The description displayed by tqdm in the progress bar.
:param disable: Disables the tqdm progress bar.
:param kwargs: Any additional arguments that should be passed to the function.
"""
if kwargs:
function_wrapper = partial(_wrapper, function=function, **kwargs)
else:
function_wrapper = partial(_wrapper, function=function)


results = [None] * len(iterable)
with Pool(processes=processes) as p:
with tqdm(desc=desc, total=len(iterable), disable=disable) as pbar:
for i, result in p.imap_unordered(function_wrapper, enumerate(iterable), chunksize=chunksize):
results[i] = result
pbar.update()
return results




def _wrapper(enum_iterable, function, **kwargs):
i = enum_iterable[0]
result = function(enum_iterable[1], **kwargs)
return i, result