映射与提交有什么不同?

我只是对自己写的一些代码感到非常困惑,我惊讶地发现:

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(f, iterable))

还有

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
results = list(map(lambda x: executor.submit(f, x), iterable))

产生不同的结果。第一个函数生成 f返回的任何类型的列表,第二个函数生成 concurrent.futures.Future对象的列表,然后需要使用它们的 result()方法对这些对象进行计算,以获得 f返回的值。

我主要关心的是,这意味着 executor.map不能利用 concurrent.futures.as_completed,这似乎是一种非常方便的方式来评估一些长时间运行的数据库调用的结果,我正在使他们成为可用的。

我完全不清楚 concurrent.futures.ThreadPoolExecutor对象是如何工作的——天真地说,我更喜欢(稍微更详细一些) :

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
result_futures = list(map(lambda x: executor.submit(f, x), iterable))
results = [f.result() for f in futures.as_completed(result_futures)]

更简洁的 executor.map,以便利用可能的性能增益。我这样做有错吗?

82375 次浏览

The problem is that you transform the result of ThreadPoolExecutor.map to a list. If you don't do this and instead iterate over the resulting generator directly, the results are still yielded in the original order but the loop continues before all results are ready. You can test this with this example:

import time
import concurrent.futures


e = concurrent.futures.ThreadPoolExecutor(4)
s = range(10)
for i in e.map(time.sleep, s):
print(i)

The reason for the order being kept may be because it's sometimes important that you get results in the same order you give them to map. And results are probably not wrapped in future objects because in some situations it may take just too long to do another map over the list to get all results if you need them. And after all in most cases it's very likely that the next value is ready before the loop processed the first value. This is demonstrated in this example:

import concurrent.futures


executor = concurrent.futures.ThreadPoolExecutor() # Or ProcessPoolExecutor
data = some_huge_list()
results = executor.map(crunch_number, data)
finals = []


for value in results:
finals.append(do_some_stuff(value))

In this example it may be likely that do_some_stuff takes longer than crunch_number and if this is really the case it's really not a big loss of performance while you still keep the easy usage of map.

Also since the worker threads(/processes) start processing at the beginning of the list and work their way to the end to the list you submitted the results should be finished in the order they're already yielded by the iterator. Which means in most cases executor.map is just fine, but in some cases, for example if it doesn't matter in which order you process the values and the function you passed to map takes very different times to run, the future.as_completed may be faster.

Below is an example of .submit() vs .map(). They both accept the jobs immediately (submitted|mapped - start). They take the same time to complete, 11 seconds (last result time - start). However, .submit() gives results as soon as any thread in the ThreadPoolExecutor maxThreads=2 completes (unordered!). While .map() gives results in the order they are submitted.

import time
import concurrent.futures


def worker(i):
time.sleep(i)
return i,time.time()


e = concurrent.futures.ThreadPoolExecutor(2)
arrIn = range(1,7)[::-1]
print arrIn


f = []
print 'start submit',time.time()
for i in arrIn:
f.append(e.submit(worker,i))
print 'submitted',time.time()
for r in concurrent.futures.as_completed(f):
print r.result(),time.time()
print


f = []
print 'start map',time.time()
f = e.map(worker,arrIn)
print 'mapped',time.time()
for r in f:
print r,time.time()

Output:

[6, 5, 4, 3, 2, 1]
start submit 1543473934.47
submitted 1543473934.47
(5, 1543473939.473743) 1543473939.47
(6, 1543473940.471591) 1543473940.47
(3, 1543473943.473639) 1543473943.47
(4, 1543473943.474192) 1543473943.47
(1, 1543473944.474617) 1543473944.47
(2, 1543473945.477609) 1543473945.48


start map 1543473945.48
mapped 1543473945.48
(6, 1543473951.483908) 1543473951.48
(5, 1543473950.484109) 1543473951.48
(4, 1543473954.48858) 1543473954.49
(3, 1543473954.488384) 1543473954.49
(2, 1543473956.493789) 1543473956.49
(1, 1543473955.493888) 1543473956.49

In addition to the explanation in the answers here, it can be helpful to go right to the source. It reaffirms the statement from another answer here that:

  • .map() gives results in the order they are submitted, while
  • iterating over a list of Future objects with concurrent.futures.as_completed() won't guarantee this ordering, because this is the nature of as_completed()

.map() is defined in the base class, concurrent.futures._base.Executor:

class Executor(object):
def submit(self, fn, *args, **kwargs):
raise NotImplementedError()


def map(self, fn, *iterables, timeout=None, chunksize=1):
if timeout is not None:
end_time = timeout + time.monotonic()


fs = [self.submit(fn, *args) for args in zip(*iterables)]  # <!!!!!!!!


def result_iterator():
try:
# reverse to keep finishing order
fs.reverse()  # <!!!!!!!!
while fs:
# Careful not to keep a reference to the popped future
if timeout is None:
yield fs.pop().result()  # <!!!!!!!!
else:
yield fs.pop().result(end_time - time.monotonic())
finally:
for future in fs:
future.cancel()
return result_iterator()

As you mention, there is also .submit(), which left to be defined in the child classes, namely ProcessPoolExecutor and ThreadPoolExecutor, and returns a _base.Future instance that you need to call .result() on to actually make do anything.

The important lines from .map() boil down to:

fs = [self.submit(fn, *args) for args in zip(*iterables)]
fs.reverse()
while fs:
yield fs.pop().result()

The .reverse() plus .pop() is a means to get the first-submitted result (from iterables) to be yielded first, the second-submitted result to be yielded second, and so on. The elements of the resulting iterator are not Futures; they're the actual results themselves.

if you use concurrent.futures.as_completed, you can handle the exception for each function.

import concurrent.futures
iterable = [1,2,3,4,6,7,8,9,10]


def f(x):
if x == 2:
raise Exception('x')
return x


with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
result_futures = list(map(lambda x: executor.submit(f, x), iterable))
for future in concurrent.futures.as_completed(result_futures):
try:
print('resutl is', future.result())
except Exception as e:
print('e is', e, type(e))
# resutl is 3
# resutl is 1
# resutl is 4
# e is x <class 'Exception'>
# resutl is 6
# resutl is 7
# resutl is 8
# resutl is 9
# resutl is 10

in executor.map, if there is an exception, the whole executor would stop. you need to handle the exception in the worker function.

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
for each in executor.map(f, iterable):
print(each)
# if there is any exception, executor.map would stop