传递多个参数到 concurrent.future. Executor.map?

concurrent.futures.Executor.map接受数量可变的迭代器,从这些迭代器中给出的函数被调用

下面的方法不起作用,因为每个生成的元组都作为一个不同的参数给出,以便进行映射:

args = ((a, b) for (a, b) in c)
for result in executor.map(f, *args):
pass

如果没有生成器,映射所需的参数可能如下所示:

executor.map(
f,
(i[0] for i in args),
(i[1] for i in args),
...,
(i[N] for i in args),
)
100230 次浏览

One argument that is repeated, one argument in c

from itertools import repeat
for result in executor.map(f, repeat(a), c):
pass

Need to unpack items of c, and can unpack c

from itertools import izip
for result in executor.map(f, *izip(*c)):
pass

Need to unpack items of c, can't unpack c

  1. Change f to take a single argument and unpack the argument in the function.
  2. If each item in c has a variable number of members, or you're calling f only a few times:

    executor.map(lambda args, f=f: f(*args), c)
    

    It defines a new function that unpacks each item from c and calls f. Using a default argument for f in the lambda makes f local inside the lambda and so reduces lookup time.

  3. If you've got a fixed number of arguments, and you need to call f a lot of times:

    from collections import deque
    def itemtee(iterable, n=2):
    def gen(it = iter(iterable), items = deque(), next = next):
    popleft = items.popleft
    extend = items.extend
    while True:
    if not items:
    extend(next(it))
    yield popleft()
    return [gen()] * n
    
    
    executor.map(f, *itemtee(c, n))
    

Where n is the number of arguments to f. This is adapted from itertools.tee.

You need to remove the * on the map call:

args = ((a, b) for b in c)
for result in executor.map(f, args):
pass

This will call f, len(args) times, where f should accept one parameter.

If you want f to accept two parameters you can use a lambda call like:

args = ((a, b) for b in c)
for result in executor.map(lambda p: f(*p), args):   # (*p) does the unpacking part
pass

You can use currying to create new function via partial method in Python

from concurrent.futures import ThreadPoolExecutor
from functools import partial




def some_func(param1, param2):
# some code


# currying some_func with 'a' argument is repeated
func = partial(some_func, a)
with ThreadPoolExecutor() as executor:
executor.map(func, list_of_args):
...

If you need to pass more than one the same parameters you can pass them to partial method

func = partial(some_func, a, b, c)

For ProcessPoolExecutor.map():

Similar to map(func, *iterables) except:

the iterables are collected immediately rather than lazily;

func is executed asynchronously and several calls to func may be made concurrently.

Therefore, the usage of ProcessPoolExecutor.map() is the same as that of Python's build-in map(). Here is the docs:

Return an iterator that applies function to every item of iterable, yielding the results. If additional iterable arguments are passed, function must take that many arguments and is applied to the items from all iterables in parallel.

Conclusion: pass the several parameters to map().

Try running the following snippet under python 3, and you will be quite clear:

from concurrent.futures import ProcessPoolExecutor


def f(a, b):
print(a+b)


with ProcessPoolExecutor() as pool:
pool.map(f, (0, 1, 2, 3, 4, 5, 6, 7, 8, 9), (0, 1, 2))


# 0, 2, 4


array = [(i, i) for i in range(3)]
with ProcessPoolExecutor() as pool:
pool.map(f, *zip(*array))


# 0, 2, 4

I have seen so many answers here, but none of them is as straight forward as using lambda expressions:

foo(x,y): pass

want to call above method 10 times, with same value i.e. xVal and yVal? with concurrent.futures.ThreadPoolExecutor() as executor:

for _ in executor.map( lambda _: foo(xVal, yVal), range(0, 10)):
pass

So suppose you have a function which takes 3 arguments and all the 3 arguments are dynamic and keep on changing with every call. For example:

def multiply(a,b,c):
print(a * b * c)

To call this multiple times using threading, I would first create a list of tuples where each tuple is a version of a,b,c:

arguments = [(1,2,3), (4,5,6), (7,8,9), ....]

To we know that concurrent.futures's map function would accept first argument as the target function and second argument as the list of arguments for each version of the function that will be execute. Therefore, you might make a call like this:

for _ in executor.map(multiply, arguments) # Error

But this will give you error that the function expected 3 arguments but got only 1. To solve this problem, we create a helper function:

def helper(numbers):
multiply(numbers[0], numbers[1], numbers[2])

Now, we can call this function using executor as follow:

with ThreadPoolExecutor() as executor:
for _ in executor.map(helper, arguments):
pass

That should give you the desired results.

Here's a code snippet showing how to send multiple arguments to a function with ThreadPoolExecutor:

import concurrent.futures




def hello(first_name: str, last_name: str) -> None:
"""Prints a friendly hello with first name and last name"""
print('Hello %s %s!' % (first_name, last_name))




def main() -> None:
"""Examples showing how to use ThreadPoolExecutor and executer.map
sending multiple arguments to a function"""


# Example 1: Sending multiple arguments using tuples
# Define tuples with sequential arguments to be passed to hello()
args_names = (
('Bruce', 'Wayne'),
('Clark', 'Kent'),
('Diana', 'Prince'),
('Barry', 'Allen'),
)
with concurrent.futures.ThreadPoolExecutor() as executor:
# Using lambda, unpacks the tuple (*f) into hello(*args)
executor.map(lambda f: hello(*f), args_names)


print()


# Example 2: Sending multiple arguments using dict with named keys
# Define dicts with arguments as key names to be passed to hello()
kwargs_names = (
{'first_name': 'Bruce', 'last_name': 'Wayne'},
{'first_name': 'Clark', 'last_name': 'Kent'},
{'first_name': 'Diana', 'last_name': 'Prince'},
{'first_name': 'Barry', 'last_name': 'Allen'},
)
with concurrent.futures.ThreadPoolExecutor() as executor:
# Using lambda, unpacks the dict (**f) into hello(**kwargs)
executor.map(lambda f: hello(**f), kwargs_names)




if __name__ == '__main__':
main()

lets say you have data like this in data frame shown below and you want to pass 1st two columns to a function which will read the images and predict the fetaures and then calculate the difference and return the difference value.

Note: you can have any scenario as per your requirement and respectively you can define the function.

The below code snippet will takes these two columns as argument and pass to the Threadpool mechanism (showing the progress bar also)

enter image description here

''' function that will give the difference of two numpy feature matrix'''
def getDifference(image_1_loc, image_2_loc, esp=1e-7):
arr1 = ''' read 1st image and extract feature '''
arr2 = ''' read 2nd image and extract feature '''
diff = arr1.ravel() - arr2.ravel() + esp
return diff


'''Using ThreadPoolExecutor from concurrent.futures with multiple argument'''


with ThreadPoolExecutor() as executor:
result = np.array(
list(tqdm(
executor.map(lambda x : function(*x), [(i,j) for i,j in df[['image_1','image_2']].values]),
total=len(df)
)
)
)

enter image description here

A simple utility that I use all the time is below.

########### Start of Utility Code ###########


import os
import sys
import traceback


from concurrent import futures
from functools import partial




def catch(fn):
def wrap(*args, **kwargs):
result = None
try:
result = fn(*args, **kwargs)
except Exception as err:
type_, value_, traceback_ = sys.exc_info()
return None, (
args,
"".join(traceback.format_exception(type_, value_, traceback_)),
)
else:
return result, (args, None)


return wrap




def top_level_wrap(fn, arg_tuple):
args, kwargs = arg_tuple
return fn(*args, *kwargs)




def create_processes(fn, values, handle_error, handle_success):
cores = os.cpu_count()
max_workers = 2 * cores + 1


to_exec = partial(top_level_wrap, fn)


with futures.ProcessPoolExecutor(max_workers=max_workers) as executor:
for result, error in executor.map(to_exec, values):
args, tb = error
if tb is not None:
handle_error(args, tb)
else:
handle_success(result)




########### End of Utility Code ###########

Example usage -

######### Start of example usage ###########


import time




@catch
def fail_when_5(val):
time.sleep(val)
if val == 5:
raise Exception("Error - val was 5")
else:
return f"No error val is {val}"




def handle_error(args, tb):
print("args is", args)
print("TB is", tb)




def top_level(val, val_2, test=None, test2="ok"):
print(val_2, test, test2)
return fail_when_5(val)


handle_success = print




if __name__ == "__main__":
# SHAPE -> ( (args, kwargs), (args, kwargs), ... )
values = tuple(
((x, x + 1), {"test": f"t_{x+2}", "test2": f"t_{x+3}"}) for x in range(10)
)
create_processes(top_level, values, handle_error, handle_success)


######### End of example usage ###########