如何在 Python 中进行并行编程?

对于 C + + ,我们可以使用 OpenMP 进行并行编程; 但是,OpenMP 不适用于 Python。如果我想并行 Python 程序的某些部分,我应该做什么?

守则的结构可视为:

solve1(A)
solve2(B)

其中 solve1solve2是两个独立的函数。如何以并行代替顺序运行这类代码以减少运行时间? 密码是:

def solve(Q, G, n):
i = 0
tol = 10 ** -4


while i < 1000:
inneropt, partition, x = setinner(Q, G, n)
outeropt = setouter(Q, G, n)


if (outeropt - inneropt) / (1 + abs(outeropt) + abs(inneropt)) < tol:
break
            

node1 = partition[0]
node2 = partition[1]
    

G = updateGraph(G, node1, node2)


if i == 999:
print "Maximum iteration reaches"
print inneropt

其中 setinnersetouter是两个独立的函数。这就是我想要并行的地方..。

330187 次浏览

CPython 使用的 GIL 让并行编程比 C + + 更有趣一些

本专题有几个关于这一挑战的有用例子和说明:

在 Linux 上使用 taskset 的多核系统上的 Python GIL (gIL)解决方案?

您可以使用 多重处理模块:

from multiprocessing import Pool
pool = Pool()
result1 = pool.apply_async(solve1, [A])    # evaluate "solve1(A)" asynchronously
result2 = pool.apply_async(solve2, [B])    # evaluate "solve2(B)" asynchronously
answer1 = result1.get(timeout=10)
answer2 = result2.get(timeout=10)

这将产生可以为您执行通用工作的进程。由于我们没有传递 processes,它将为您的机器上的每个 CPU 核心产生一个进程。每个 CPU 核心可以同时执行一个进程。

如果你想把一个列表映射到一个函数,你可以这样做:

args = [A, B]
results = pool.map(solve1, args)

不要使用线程,因为 GIL锁定 Python 对象上的任何操作。

这可以用 非常优雅地完成。

要并行化示例,需要使用 @ray.remote装饰符定义函数,然后使用 .remote调用它们。

import ray


ray.init()


# Define the functions.


@ray.remote
def solve1(a):
return 1


@ray.remote
def solve2(b):
return 2


# Start two tasks in the background.
x_id = solve1.remote(0)
y_id = solve2.remote(1)


# Block until the tasks are done and get the results.
x, y = ray.get([x_id, y_id])

多重处理模块相比,这有许多优点。

  1. 同样的代码将运行在多核计算机以及一组计算机上。
  2. 进程通过 共享内存和零拷贝序列化有效地共享数据。
  3. 错误消息被很好地传播。
  4. 这些函数调用可以组合在一起,例如,

    @ray.remote
    def f(x):
    return x + 1
    
    
    x_id = f.remote(1)
    y_id = f.remote(x_id)
    z_id = f.remote(y_id)
    ray.get(z_id)  # returns 4
    
  5. In addition to invoking functions remotely, classes can be instantiated remotely as actors.

Note that Ray is a framework I've been helping develop.

正如其他人所说,解决方案是使用多个进程。然而,哪种框架更合适取决于许多因素。除了已经提到的,还有 魅力四射Mpi4py(我是 charm4py 的开发者)。

有一种比使用工作者池抽象更有效的方法来实现上面的示例。主循环在每个1000次迭代中一次又一次地向工作者发送相同的参数(包括完整的图 G)。由于至少有一个工作者驻留在不同的进程中,这涉及到将参数复制并发送给其他进程。根据对象的大小,这可能是非常昂贵的。相反,让工作人员存储状态并简单地发送更新的信息是有意义的。

例如,在 charm4py 中可以这样做:

class Worker(Chare):


def __init__(self, Q, G, n):
self.G = G
...


def setinner(self, node1, node2):
self.updateGraph(node1, node2)
...




def solve(Q, G, n):
# create 2 workers, each on a different process, passing the initial state
worker_a = Chare(Worker, onPE=0, args=[Q, G, n])
worker_b = Chare(Worker, onPE=1, args=[Q, G, n])
while i < 1000:
result_a = worker_a.setinner(node1, node2, ret=True)  # execute setinner on worker A
result_b = worker_b.setouter(node1, node2, ret=True)  # execute setouter on worker B


inneropt, partition, x = result_a.get()  # wait for result from worker A
outeropt = result_b.get()  # wait for result from worker B
...

注意,对于这个示例,我们实际上只需要一个 worker。主循环可以执行其中一个函数,并让工作者执行另一个函数。但是我的代码有助于说明以下几点:

  1. Worker A 在进程0中运行(与主循环相同)。当 result_a.get()被阻塞等待结果时,工作人员 A 在同一个进程中执行计算。
  2. 参数通过引用自动传递给工作者 A,因为它位于 过程(不涉及复制)。

在某些情况下,可以使用 Numba自动并行化循环,尽管它只适用于 Python 的一小部分:

from numba import njit, prange


@njit(parallel=True)
def prange_test(A):
s = 0
# Without "parallel=True" in the jit-decorator
# the prange statement is equivalent to range
for i in prange(A.shape[0]):
s += A[i]
return s

不幸的是,Numba 似乎只能处理 Numpy 数组,而不能处理其他 Python 对象。在理论上,它也可能是 编译 Python 到 C + + ,然后 使用 Intel C++编译器自动将其并行化,虽然我还没有尝试这一点。

可以使用 joblib库进行并行计算和多处理。

from joblib import Parallel, delayed

您可以简单地创建一个函数 foo,您希望它并行运行,并根据以下代码实现并行处理:

output = Parallel(n_jobs=num_cores)(delayed(foo)(i) for i in input)

其中 num_cores可以从 multiprocessing库获得,如下:

import multiprocessing


num_cores = multiprocessing.cpu_count()

如果您有一个具有多个输入参数的函数,并且您只想通过一个列表对其中一个参数进行迭代,那么您可以使用 functools库中的 partial函数,如下所示:

from joblib import Parallel, delayed
import multiprocessing
from functools import partial
def foo(arg1, arg2, arg3, arg4):
'''
body of the function
'''
return output
input = [11,32,44,55,23,0,100,...] # arbitrary list
num_cores = multiprocessing.cpu_count()
foo_ = partial(foo, arg2=arg2, arg3=arg3, arg4=arg4)
# arg1 is being fetched from input list
output = Parallel(n_jobs=num_cores)(delayed(foo_)(i) for i in input)

您可以通过几个示例 给你找到有关 python 和 R 多处理的完整说明。

我总是使用“多处理”本机库来处理 Python 中的并行性。为了控制队列中进程的数量,我使用一个共享变量作为计数器。在下面的示例中,您可以看到简单流程的并行执行是如何工作的。

我对脚本进行了更新,使其更易于使用。基本上,您要做的唯一一件事情就是用您想要并行运行的函数覆盖 process方法。看这个例子,程序很简单。或者,也可以删除所有执行日志事件。

当我有时间的时候,我会更新代码来处理返回值的进程。

规定

user@host:~$ pip install coloredlogs==15.0.1

密码

并行处理脚本 (复制和粘贴):

#!/usr/bin/env python
# encoding: utf-8


from multiprocessing import Manager, Pool, Value, cpu_count
from multiprocessing.pool import ThreadPool
from typing import Any, Iterator
from datetime import datetime
from logging import Logger
import coloredlogs
import logging
import time
import sys
import os




LOG_LEVEL = "DEBUG"




def get_logger(name: str = __name__, level: str = LOG_LEVEL) -> Logger:
assert level in ("NOTSET", "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL")


# Setting-up the script logging:
logging.basicConfig(
stream=sys.stdout,
format="%(asctime)s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
level=level
)


logger = logging.getLogger(name)
coloredlogs.install(level=level, logger=logger, isatty=True)


return logger




class ParallelProcessing:
"""
Parallel processing.


References
----------
[1] Class `ParallelProcessing`: https://stackoverflow.com/a/70464369/16109419


Examples
--------
>>> class MyParallelProcessing(ParallelProcessing):
>>>     def process(self, name: str) -> None:
>>>         logger = get_logger()
>>>         logger.info(f"Executing process: {name}...")
>>>         time.sleep(5)
>>>
>>>
>>> params_list = [("A",), ("B",), ("C",), ("D",), ("E",), ("F",)]
>>> mpp = MyParallelProcessing()
>>> mpp.run(args_list=params_list)
"""


_n_jobs: int
_waiting_time: int
_queue: Value
_logger: Logger


def __init__(self, n_jobs: int = -1, waiting_time: int = 1):
"""
Instantiates a parallel processing object to execute processes in parallel.


Parameters
----------
n_jobs: int
Number of jobs.
waiting_time: int
Waiting time when jobs queue is full, e.g. `_queue.value` == `_n_jobs`.
"""
self._n_jobs = n_jobs if n_jobs >= 0 else cpu_count()
self._waiting_time = waiting_time if waiting_time >= 0 else 60*60
self._logger = get_logger()


def process(self, *args) -> None:
"""
Abstract process that must be overridden.


Parameters
----------
*args
Parameters of the process to be executed.
"""
raise NotImplementedError("Process not defined ('NotImplementedError' exception).")


def _execute(self, *args) -> None:
"""
Run the process and remove it from the process queue by decreasing the queue process counter.


Parameters
----------
*args
Parameters of the process to be executed.
"""
self.process(*args)
self._queue.value -= 1


def _error_callback(self, result: Any) -> None:
"""
Error callback.


Parameters
----------
result: Any
Result from exceptions.
"""
self._logger.error(result)
os._exit(1)


def run(self, args_list: Iterator[tuple], use_multithreading: bool = False) -> None:
"""
Run processes in parallel.


Parameters
----------
args_list: Iterator[tuple]
List of process parameters (`*args`).
use_multithreading: bool
Use multithreading instead multiprocessing.
"""
manager = Manager()
self._queue = manager.Value('i', 0)
lock = manager.Lock()
pool = Pool(processes=self._n_jobs) if not use_multithreading else ThreadPool(processes=self._n_jobs)


start_time = datetime.now()


with lock:  # Write-protecting the processes queue shared variable.
for args in args_list:
while True:
if self._queue.value < self._n_jobs:
self._queue.value += 1


# Running processes in parallel:
pool.apply_async(func=self._execute, args=args, error_callback=self._error_callback)


break
else:
self._logger.debug(f"Pool full ({self._n_jobs}): waiting {self._waiting_time} seconds...")
time.sleep(self._waiting_time)


pool.close()
pool.join()


exec_time = datetime.now() - start_time
self._logger.info(f"Execution time: {exec_time}")

使用示例:

class MyParallelProcessing(ParallelProcessing):
def process(self, name: str) -> None:
"""
Process to run in parallel (overrides abstract method).
"""
logger = get_logger()
logger.info(f"Executing process: {name}...")
time.sleep(5)




def main() -> None:
n_jobs = int(sys.argv[1])  # Number of jobs to run in parallel.
params_list = [("A",), ("B",), ("C",), ("D",), ("E",), ("F",)]


mpp = MyParallelProcessing(n_jobs=n_jobs)


# Executing processes in parallel:
mpp.run(args_list=params_list)




if __name__ == '__main__':
main()

执行和输出

user@host:~$ python run.py 1
2021-12-23 12:41:51 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:51 MYMACHINE __mp_main__[12352] INFO Executing process: A...
2021-12-23 12:41:52 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:53 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:54 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:55 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:56 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:57 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:57 MYMACHINE __mp_main__[12352] INFO Executing process: B...
2021-12-23 12:41:58 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:41:59 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:00 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
...
2021-12-23 12:42:10 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:11 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:12 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:12 MYMACHINE __mp_main__[12352] INFO Executing process: E...
2021-12-23 12:42:13 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:14 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:15 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:16 MYMACHINE __main__[24180] DEBUG Pool full (1): waiting 1 seconds...
2021-12-23 12:42:18 MYMACHINE __mp_main__[12352] INFO Executing process: F...
2021-12-23 12:42:23 MYMACHINE __main__[24180] INFO Execution time: 0:00:31.274478
user@host:~$ python run.py 3
2021-12-23 12:33:59 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:33:59 MYMACHINE __mp_main__[19776] INFO Executing process: A...
2021-12-23 12:33:59 MYMACHINE __mp_main__[24632] INFO Executing process: B...
2021-12-23 12:33:59 MYMACHINE __mp_main__[15852] INFO Executing process: C...
2021-12-23 12:34:00 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:34:01 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:34:02 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:34:03 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:34:04 MYMACHINE __main__[7628] DEBUG Pool full (3): waiting 1 seconds...
2021-12-23 12:34:05 MYMACHINE __mp_main__[19776] INFO Executing process: D...
2021-12-23 12:34:05 MYMACHINE __mp_main__[24632] INFO Executing process: E...
2021-12-23 12:34:05 MYMACHINE __mp_main__[15852] INFO Executing process: F...
2021-12-23 12:34:10 MYMACHINE __main__[7628] INFO Execution time: 0:00:11.087672
user@host:~$ python run.py 6
2021-12-23 12:40:48 MYMACHINE __mp_main__[26312] INFO Executing process: A...
2021-12-23 12:40:48 MYMACHINE __mp_main__[11468] INFO Executing process: B...
2021-12-23 12:40:48 MYMACHINE __mp_main__[12000] INFO Executing process: C...
2021-12-23 12:40:48 MYMACHINE __mp_main__[19864] INFO Executing process: D...
2021-12-23 12:40:48 MYMACHINE __mp_main__[25356] INFO Executing process: E...
2021-12-23 12:40:48 MYMACHINE __mp_main__[14504] INFO Executing process: F...
2021-12-23 12:40:53 MYMACHINE __main__[1180] INFO Execution time: 0:00:05.295934

如果你不能投入时间去学习其他答案中推荐的图书馆或模块的要求和假设,以下内容可能适合你:

  1. 给出运行任务各个部分的脚本选项。
  2. 当准备好并行运行 n部件时,使用 child = subprocess.Popen(args = [sys.argv[0], ...])启动它们,在附加选项和/或参数文件中提供部件号和其他详细信息,并为每个子级调用 child.wait()

如果你想监控进度,一旦工人完成或等待期间做其他事情,立即启动更多的工人,使用 child.poll()而不是 child.wait(),并检查 child.returncode是否仍然是 None

对于大型任务,启动新进程以及写入和读取文件的开销是最小的。对于许多小任务,人们希望只启动一次 worker,然后通过管道或套接字与它们通信,但这需要做更多的工作,而且必须小心翼翼地完成,以避免出现死锁的可能性。在这种情况下,最好学习如何使用其他答案中推荐的模块。

你可以把你的数据帧转换成 戴斯克数据帧,它可以为你处理并行计算。

import dask.dataframe as dd
pdf = pd.Pandas({"A" : A, "B" : B})
ddf = dd.from_pandas(pdf, npartitions=3)
solve(ddf)

下面是一个在 Windows 环境下工作的完整示例; 异步处理的优点是节省时间:

import multiprocessing
import time
from multiprocessing import Pool, freeze_support
from multiprocessing import Pool




def f1(a):
c = 0
for i in range(0, 99999999):
c = c + 1
return 1




def f2(b):
c = 0
for i in range(0, 99999999):
c = c + 1
return 1


if __name__ == '__main__':


pool = Pool(multiprocessing.cpu_count())
result1 = pool.apply_async(f1, [0])
result2 = pool.apply_async(f2, [9])
freeze_support()
t0 = time.time()
answer1 = result1.get(timeout=10)
answer2 = result2.get(timeout=10)
print(time.time()-t0)
t0 = time.time()
aa = f1(1)
bb = f2(2)
print(time.time()-t0)