Use tqdm with concurrent.futures?

I have a multithreaded function that I would like a status bar for using tqdm. Is there an easy way to show a status bar with ThreadPoolExecutor? It is the parallelization part that is confusing me.

import concurrent.futures


def f(x):
return f**2


my_iter = range(1000000)


def run(f,my_iter):
with concurrent.futures.ThreadPoolExecutor() as executor:
function = list(executor.map(f, my_iter))
return results


run(f, my_iter) # wrap tqdr around this function?
33528 次浏览

You can wrap tqdm around the executor as the following to track the progress:

list(tqdm(executor.map(f, iter), total=len(iter))

Here is your example:

import time
import concurrent.futures
from tqdm import tqdm


def f(x):
time.sleep(0.001)  # to visualize the progress
return x**2


def run(f, my_iter):
with concurrent.futures.ThreadPoolExecutor() as executor:
results = list(tqdm(executor.map(f, my_iter), total=len(my_iter)))
return results


my_iter = range(100000)
run(f, my_iter)

And the result is like this:

16%|██▏           | 15707/100000 [00:00<00:02, 31312.54it/s]

Most short way, i think:

with ThreadPoolExecutor(max_workers=20) as executor:
results = list(tqdm(executor.map(myfunc, range(len(my_array))), total=len(my_array)))

The problem with the accepted answer is that the ThreadPoolExecutor.map function is obliged to generate results not in the order that they become available. So if the first invocation of myfunc happens to be, for example, the last one to complete, the progress bar will go from 0% to 100% all at once and only when all of the calls have completed. Much better would be to use ThreadPoolExecutor.submit with as_completed:

import time
import concurrent.futures
from tqdm import tqdm


def f(x):
time.sleep(0.001)  # to visualize the progress
return x**2


def run(f, my_iter):
l = len(my_iter)
with tqdm(total=l) as pbar:
# let's give it some more threads:
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
futures = {executor.submit(f, arg): arg for arg in my_iter}
results = {}
for future in concurrent.futures.as_completed(futures):
arg = futures[future]
results[arg] = future.result()
pbar.update(1)
print(321, results[321])


my_iter = range(100000)
run(f, my_iter)

Prints:

321 103041

This is just the general idea. Depending upon the type of my_iter, it may not be possible to directly take apply the len function directly to it without first converting it into a list. The main point is to use submit with as_completed.

tried the example but progress bar fails still, and I find this post, seems useful in short way to use:

def tqdm_parallel_map(fn, *iterables):
""" use tqdm to show progress"""
executor = concurrent.futures.ProcessPoolExecutor()
futures_list = []
for iterable in iterables:
futures_list += [executor.submit(fn, i) for i in iterable]
for f in tqdm(concurrent.futures.as_completed(futures_list), total=len(futures_list)):
yield f.result()




def multi_cpu_dispatcher_process_tqdm(data_list, single_job_fn):
""" multi cpu dispatcher """
output = []
for result in tqdm_parallel_map(single_job_fn, data_list):
output += result
return output