如何并行化一个简单的Python循环?

这可能是一个微不足道的问题,但我如何在python中并行化下面的循环?

# setup output lists
output1 = list()
output2 = list()
output3 = list()


for j in range(0, 10):
# calc individual parameter value
parameter = j * offset
# call the calculation
out1, out2, out3 = calc_stuff(parameter = parameter)


# put results into correct output list
output1.append(out1)
output2.append(out2)
output3.append(out3)

我知道如何在Python中启动单个线程,但我不知道如何“收集”结果。

多个进程也可以——在这种情况下,只要是最简单的就行。我目前使用的是Linux,但代码应该在Windows和Mac上运行。

并行化这段代码最简单的方法是什么?

604220 次浏览

由于全局解释器锁(GIL)的存在,在CPython上使用多线程并不能为纯python代码提供更好的性能。我建议使用multiprocessing模块:

pool = multiprocessing.Pool(4)
out1, out2, out3 = zip(*pool.map(calc_stuff, range(0, 10 * offset, offset)))

注意,这在交互式解释器中不起作用。

为了避免GIL周围常见的FUD:在本例中使用线程没有任何优势。你想要在这里使用进程,而不是线程,因为它们避免了一大堆问题。

为什么不用线程和一个互斥来保护一个全局列表呢?

import os
import re
import time
import sys
import thread


from threading import Thread


class thread_it(Thread):
def __init__ (self,param):
Thread.__init__(self)
self.param = param
def run(self):
mutex.acquire()
output.append(calc_stuff(self.param))
mutex.release()




threads = []
output = []
mutex = thread.allocate_lock()


for j in range(0, 10):
current = thread_it(j * offset)
threads.append(current)
current.start()


for t in threads:
t.join()


#here you have output list filled with data

请记住,您的速度将与最慢的线程一样快

看看这个;

http://docs.python.org/library/queue.html

这可能不是正确的方法,但我会这样做;

实际的代码;

from multiprocessing import Process, JoinableQueue as Queue


class CustomWorker(Process):
def __init__(self,workQueue, out1,out2,out3):
Process.__init__(self)
self.input=workQueue
self.out1=out1
self.out2=out2
self.out3=out3
def run(self):
while True:
try:
value = self.input.get()
#value modifier
temp1,temp2,temp3 = self.calc_stuff(value)
self.out1.put(temp1)
self.out2.put(temp2)
self.out3.put(temp3)
self.input.task_done()
except Queue.Empty:
return
#Catch things better here
def calc_stuff(self,param):
out1 = param * 2
out2 = param * 4
out3 = param * 8
return out1,out2,out3
def Main():
inputQueue = Queue()
for i in range(10):
inputQueue.put(i)
out1 = Queue()
out2 = Queue()
out3 = Queue()
processes = []
for x in range(2):
p = CustomWorker(inputQueue,out1,out2,out3)
p.daemon = True
p.start()
processes.append(p)
inputQueue.join()
while(not out1.empty()):
print out1.get()
print out2.get()
print out3.get()
if __name__ == '__main__':
Main()

希望这能有所帮助。

为了并行化一个简单的for循环,joblib为多处理的原始使用带来了很多价值。不仅是简短的语法,还包括在迭代非常快的时候(以消除开销)进行透明的迭代聚集,或者捕获子进程的回溯,以获得更好的错误报告。

免责声明:我是joblib的原作者。

这在用Python实现多处理和并行/分布式计算时非常有用。

YouTube上使用techila包的教程

Techila是一种分布式计算中间件,它使用Techila包直接与Python集成。包中的peach函数在并行化循环结构时非常有用。(以下代码片段来自Techila社区论坛)

techila.peach(funcname = 'theheavyalgorithm', # Function that will be called on the compute nodes/ Workers
files = 'theheavyalgorithm.py', # Python-file that will be sourced on Workers
jobs = jobcount # Number of Jobs in the Project
)

并行化这段代码最简单的方法是什么?

使用concurrent.futures中的PoolExecutor。将原始代码与此代码并排比较。首先,最简洁的方法是使用executor.map:

...
with ProcessPoolExecutor() as executor:
for out1, out2, out3 in executor.map(calc_stuff, parameters):
...

或者通过单独提交每个电话来分解:

...
with ThreadPoolExecutor() as executor:
futures = []
for parameter in parameters:
futures.append(executor.submit(calc_stuff, parameter))


for future in futures:
out1, out2, out3 = future.result() # this will block
...

离开上下文表示执行程序释放资源

您可以使用线程或进程,并使用完全相同的接口。

一个工作示例

下面是工作示例代码,将演示的价值:

把它放在一个文件futuretest.py中:

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from time import time
from http.client import HTTPSConnection


def processor_intensive(arg):
def fib(n): # recursive, processor intensive calculation (avoid n > 36)
return fib(n-1) + fib(n-2) if n > 1 else n
start = time()
result = fib(arg)
return time() - start, result


def io_bound(arg):
start = time()
con = HTTPSConnection(arg)
con.request('GET', '/')
result = con.getresponse().getcode()
return time() - start, result


def manager(PoolExecutor, calc_stuff):
if calc_stuff is io_bound:
inputs = ('python.org', 'stackoverflow.com', 'stackexchange.com',
'noaa.gov', 'parler.com', 'aaronhall.dev')
else:
inputs = range(25, 32)
timings, results = list(), list()
start = time()
with PoolExecutor() as executor:
for timing, result in executor.map(calc_stuff, inputs):
# put results into correct output list:
timings.append(timing), results.append(result)
finish = time()
print(f'{calc_stuff.__name__}, {PoolExecutor.__name__}')
print(f'wall time to execute: {finish-start}')
print(f'total of timings for each call: {sum(timings)}')
print(f'time saved by parallelizing: {sum(timings) - (finish-start)}')
print(dict(zip(inputs, results)), end = '\n\n')


def main():
for computation in (processor_intensive, io_bound):
for pool_executor in (ProcessPoolExecutor, ThreadPoolExecutor):
manager(pool_executor, calc_stuff=computation)


if __name__ == '__main__':
main()

下面是一次运行python -m futuretest的输出:

processor_intensive, ProcessPoolExecutor
wall time to execute: 0.7326343059539795
total of timings for each call: 1.8033506870269775
time saved by parallelizing: 1.070716381072998
{25: 75025, 26: 121393, 27: 196418, 28: 317811, 29: 514229, 30: 832040, 31: 1346269}


processor_intensive, ThreadPoolExecutor
wall time to execute: 1.190223217010498
total of timings for each call: 3.3561410903930664
time saved by parallelizing: 2.1659178733825684
{25: 75025, 26: 121393, 27: 196418, 28: 317811, 29: 514229, 30: 832040, 31: 1346269}


io_bound, ProcessPoolExecutor
wall time to execute: 0.533886194229126
total of timings for each call: 1.2977914810180664
time saved by parallelizing: 0.7639052867889404
{'python.org': 301, 'stackoverflow.com': 200, 'stackexchange.com': 200, 'noaa.gov': 301, 'parler.com': 200, 'aaronhall.dev': 200}


io_bound, ThreadPoolExecutor
wall time to execute: 0.38941240310668945
total of timings for each call: 1.6049387454986572
time saved by parallelizing: 1.2155263423919678
{'python.org': 301, 'stackoverflow.com': 200, 'stackexchange.com': 200, 'noaa.gov': 301, 'parler.com': 200, 'aaronhall.dev': 200}

处理器密集型的分析

在Python中执行处理器密集型计算时,期望ProcessPoolExecutorThreadPoolExecutor性能更好。

由于全局解释器锁(又名GIL)的存在,线程不能使用多个处理器,因此每次计算的时间和壁时间(实际运行的时间)会更大。

IO-bound分析

另一方面,当执行IO绑定操作时,期望ThreadPoolExecutorProcessPoolExecutor性能更好。

Python的线程是真实的,OS,线程。操作系统可以让它们进入睡眠状态,并在信息到达时将它们重新唤醒。

最终的想法

我怀疑在Windows上多处理会更慢,因为Windows不支持分叉,所以每个新进程都要花时间启动。

您可以在多个进程中嵌套多个线程,但建议不要使用多个线程来派生多个进程。

如果在Python中面临一个繁重的处理问题,您可以简单地使用额外的进程来扩展—但不能使用线程。

并行处理的一个非常简单的例子是

from multiprocessing import Process


output1 = list()
output2 = list()
output3 = list()


def yourfunction():
for j in range(0, 10):
# calc individual parameter value
parameter = j * offset
# call the calculation
out1, out2, out3 = calc_stuff(parameter=parameter)


# put results into correct output list
output1.append(out1)
output2.append(out2)
output3.append(out3)


if __name__ == '__main__':
p = Process(target=pa.yourfunction, args=('bob',))
p.start()
p.join()
from joblib import Parallel, delayed
def process(i):
return i * i
    

results = Parallel(n_jobs=2)(delayed(process)(i) for i in range(10))
print(results)  # prints [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

上面的工作在我的机器上很漂亮(Ubuntu,包joblib是预安装的,但可以通过pip install joblib安装)。

取自https://blog.dominodatalab.com/simple-parallelization/


2021年3月31日编辑:在joblibmultiprocessingthreadingasyncio

  • 上面代码中的joblib在底层使用了import multiprocessing(因此有多个进程,这通常是跨核运行CPU工作的最佳方式——因为GIL)
  • 你可以让joblib使用多个线程而不是多个进程,但这(或直接使用import threading)只在线程在I/O上花费大量时间时才有益(例如读/写磁盘,发送HTTP请求)。对于I/O工作,GIL不会阻塞另一个线程的执行
  • 从Python 3.7开始,作为threading的替代方案,你可以使用asyncio并行工作,但同样的建议适用于import threading(尽管与后者相比,只会使用一个线程;从好的方面来看,asyncio有很多很好的特性,这对异步编程很有帮助)
  • 使用多个进程会导致开销。想想看:通常,每个进程都需要初始化/加载运行计算所需的所有内容。您需要检查一下上面的代码片段是否改善了您的挂墙时间。这里是另一个,我确认joblib产生更好的结果:
import time
from joblib import Parallel, delayed


def countdown(n):
while n>0:
n -= 1
return n




t = time.time()
for _ in range(20):
print(countdown(10**7), end=" ")
print(time.time() - t)
# takes ~10.5 seconds on medium sized Macbook Pro




t = time.time()
results = Parallel(n_jobs=2)(delayed(countdown)(10**7) for _ in range(20))
print(results)
print(time.time() - t)
# takes ~6.3 seconds on medium sized Macbook Pro

使用有很多优点:

  • 除了多个核(使用相同的代码)之外,还可以在多台机器上并行。
  • 通过共享内存(和零拷贝序列化)有效地处理数值数据。
  • 具有分布式调度的高任务吞吐量。
  • 容错。

在本例中,您可以启动Ray并定义一个远程函数

import ray


ray.init()


@ray.remote(num_return_vals=3)
def calc_stuff(parameter=None):
# Do something.
return 1, 2, 3

然后并行地调用它

output1, output2, output3 = [], [], []


# Launch the tasks.
for j in range(10):
id1, id2, id3 = calc_stuff.remote(parameter=j)
output1.append(id1)
output2.append(id2)
output3.append(id3)


# Block until the results have finished and get the results.
output1 = ray.get(output1)
output2 = ray.get(output2)
output3 = ray.get(output3)

要在集群上运行相同的示例,唯一需要更改的行是对ray.init()的调用。相关文档可以在在这里中找到。

请注意,我正在帮助开发雷。

假设我们有一个async函数

async def work_async(self, student_name: str, code: str, loop):
"""
Some async function
"""
# Do some async procesing

这需要在一个大数组上运行。有些属性被传递给程序,有些则来自数组中字典元素的属性。

async def process_students(self, student_name: str, loop):
market = sys.argv[2]
subjects = [...] #Some large array
batchsize = 5
for i in range(0, len(subjects), batchsize):
batch = subjects[i:i+batchsize]
await asyncio.gather(*(self.work_async(student_name,
sub['Code'],
loop)
for sub in batch))

我发现joblib对我很有用。请看下面的例子:

from joblib import Parallel, delayed
def yourfunction(k):
s=3.14*k*k
print "Area of a circle with a radius ", k, " is:", s


element_run = Parallel(n_jobs=-1)(delayed(yourfunction)(k) for k in range(1,10))

N_jobs =-1:使用所有可用内核

由于@iuryxavier

from multiprocessing import Pool
from multiprocessing import cpu_count




def add_1(x):
return x + 1


if __name__ == "__main__":
pool = Pool(cpu_count())
results = pool.map(add_1, range(10**12))
pool.close()  # 'TERM'
pool.join()   # 'KILL'

这是最简单的方法!

你可以使用asyncio。(文档可以在在这里中找到)。它被用作多个Python异步框架的基础,这些框架提供了高性能的网络和web服务器、数据库连接库、分布式任务队列等。此外,它有高级和低级api来适应任何类型的问题。

import asyncio


def background(f):
def wrapped(*args, **kwargs):
return asyncio.get_event_loop().run_in_executor(None, f, *args, **kwargs)


return wrapped


@background
def your_function(argument):
#code

现在这个函数将在调用时并行运行,而不会将主程序置于等待状态。你也可以用它来并行for循环。当调用for循环时,虽然循环是顺序的,但每次迭代都是在解释器到达主程序时并行运行的。

1. 发射环平行于主线程没有任何等待

enter image description here

@background
def your_function(argument):
time.sleep(5)
print('function finished for '+str(argument))




for i in range(10):
your_function(i)




print('loop finished')

这将产生以下输出:

loop finished
function finished for 4
function finished for 8
function finished for 0
function finished for 3
function finished for 6
function finished for 2
function finished for 5
function finished for 7
function finished for 9
function finished for 1

更新:2022年5月

虽然这回答了最初的问题,但有一些方法可以让我们按照被点赞的评论的要求等待循环完成。把它们也加在这里。实现的关键是:asyncio.gather() &run_until_complete()。考虑以下函数:

import asyncio
import time


def background(f):
def wrapped(*args, **kwargs):
return asyncio.get_event_loop().run_in_executor(None, f, *args, **kwargs)


return wrapped


@background
def your_function(argument, other_argument): # Added another argument
time.sleep(5)
print(f"function finished for {argument=} and {other_argument=}")


def code_to_run_before():
print('This runs Before Loop!')


def code_to_run_after():
print('This runs After Loop!')

2. 平行跑,但要等待结束

enter image description here

code_to_run_before()                                                         # Anything you want to run before, run here!


loop = asyncio.get_event_loop()                                              # Have a new event loop


looper = asyncio.gather(*[your_function(i, 1) for i in range(1, 5)])         # Run the loop
                               

results = loop.run_until_complete(looper)                                    # Wait until finish


code_to_run_after()                                                          # Anything you want to run after, run here!

这将产生以下输出:

This runs Before Loop!
function finished for argument=2 and other_argument=1
function finished for argument=3 and other_argument=1
function finished for argument=1 and other_argument=1
function finished for argument=4 and other_argument=1
This runs After Loop!

3.并行运行多个循环并等待完成

enter image description here

code_to_run_before()                                                         # Anything you want to run before, run here!


loop = asyncio.get_event_loop()                                              # Have a new event loop


group1 = asyncio.gather(*[your_function(i, 1) for i in range(1, 2)])         # Run all the loops you want
group2 = asyncio.gather(*[your_function(i, 2) for i in range(3, 5)])         # Run all the loops you want
group3 = asyncio.gather(*[your_function(i, 3) for i in range(6, 9)])         # Run all the loops you want


all_groups = asyncio.gather(group1, group2, group3)                          # Gather them all
results = loop.run_until_complete(all_groups)                                # Wait until finish


code_to_run_after()                                                          # Anything you want to run after, run here!

这将产生以下输出:

This runs Before Loop!
function finished for argument=3 and other_argument=2
function finished for argument=1 and other_argument=1
function finished for argument=6 and other_argument=3
function finished for argument=4 and other_argument=2
function finished for argument=7 and other_argument=3
function finished for argument=8 and other_argument=3
This runs After Loop!

4. 循环按顺序运行,但每个循环的迭代都是彼此并行运行的

enter image description here

code_to_run_before()                                                               # Anything you want to run before, run here!


for loop_number in range(3):


loop = asyncio.get_event_loop()                                                # Have a new event loop


looper = asyncio.gather(*[your_function(i, loop_number) for i in range(1, 5)]) # Run the loop
                             

results = loop.run_until_complete(looper)                                      # Wait until finish


print(f"finished for {loop_number=}")


code_to_run_after()                                                                # Anything you want to run after, run here!

这将产生以下输出:

This runs Before Loop!
function finished for argument=3 and other_argument=0
function finished for argument=4 and other_argument=0
function finished for argument=1 and other_argument=0
function finished for argument=2 and other_argument=0
finished for loop_number=0
function finished for argument=4 and other_argument=1
function finished for argument=3 and other_argument=1
function finished for argument=2 and other_argument=1
function finished for argument=1 and other_argument=1
finished for loop_number=1
function finished for argument=1 and other_argument=2
function finished for argument=4 and other_argument=2
function finished for argument=3 and other_argument=2
function finished for argument=2 and other_argument=2
finished for loop_number=2
This runs After Loop!

更新:2022年6月

这在目前的形式可能无法运行在某些版本的jupyter笔记本电脑。原因是jupyter笔记本利用事件循环。要让它在这样的jupyter版本上工作,nest_asyncio(它将嵌套事件循环,从名称可以看出)是正确的方法。只需导入并应用它在单元格的顶部:

import nest_asyncio
nest_asyncio.apply()

上面讨论的所有功能在笔记本环境中也应该可以访问。

Dask期货;我很惊讶至今还没有人提起这件事……

from dask.distributed import Client


client = Client(n_workers=8) # In this example I have 8 cores and processes (can also use threads if desired)


def my_function(i):
output = <code to execute in the for loop here>
return output


futures = []


for i in <whatever you want to loop across here>:
future = client.submit(my_function, i)
futures.append(future)


results = client.gather(futures)
client.close()

tqdm图书馆并发包装器是并行化长时间运行代码的好方法。tqdm通过智能进度表提供当前进度和剩余时间的反馈,我发现这对于长时间计算非常有用。

循环可以通过调用thread_map来重写为并发线程,或者通过调用process_map来重写为并发多进程:

from tqdm.contrib.concurrent import thread_map, process_map




def calc_stuff(num, multiplier):
import time


time.sleep(1)


return num, num * multiplier




if __name__ == "__main__":


# let's parallelize this for loop:
# results = [calc_stuff(i, 2) for i in range(64)]


loop_idx = range(64)
multiplier = [2] * len(loop_idx)


# either with threading:
results_threading = thread_map(calc_stuff, loop_idx, multiplier)


# or with multi-processing:
results_processes = process_map(calc_stuff, loop_idx, multiplier)