如何并行运行函数?

我先研究了一下,没有找到问题的答案。我尝试在 Python 中并行运行多个函数。

我有这样的东西:

files.py


import common #common is a util class that handles all the IO stuff


dir1 = 'C:\folder1'
dir2 = 'C:\folder2'
filename = 'test.txt'
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45]


def func1():
c = common.Common()
for i in range(len(addFiles)):
c.createFiles(addFiles[i], filename, dir1)
c.getFiles(dir1)
time.sleep(10)
c.removeFiles(addFiles[i], dir1)
c.getFiles(dir1)


def func2():
c = common.Common()
for i in range(len(addFiles)):
c.createFiles(addFiles[i], filename, dir2)
c.getFiles(dir2)
time.sleep(10)
c.removeFiles(addFiles[i], dir2)
c.getFiles(dir2)

我想调用 fun1和 fun2并让它们同时运行。这些函数不相互交互,也不在同一个对象上交互。现在,我必须等待 function 1完成,然后才能启动 function 2。我该如何做如下事情:

process.py


from files import func1, func2


runBothFunc(func1(), func2())

我希望能够创建两个目录非常接近于同一时间,因为每分钟我都在计算有多少文件被创建。如果目录不在那里,它会打乱我的计时。

323795 次浏览

你可以使用 threading或者 multiprocessing

由于 CPython 的特性threading不太可能实现真正的并行性。因此,multiprocessing通常是一个更好的选择。

下面是一个完整的例子:

from multiprocessing import Process


def func1():
print 'func1: starting'
for i in xrange(10000000): pass
print 'func1: finishing'


def func2():
print 'func2: starting'
for i in xrange(10000000): pass
print 'func2: finishing'


if __name__ == '__main__':
p1 = Process(target=func1)
p1.start()
p2 = Process(target=func2)
p2.start()
p1.join()
p2.join()

启动/连接子进程的机制可以很容易地按照 runBothFunc的思路封装到一个函数中:

def runInParallel(*fns):
proc = []
for fn in fns:
p = Process(target=fn)
p.start()
proc.append(p)
for p in proc:
p.join()


runInParallel(func1, func2)

没有办法保证两个函数将同步执行,这似乎是您想要做的。

您所能做的最好的事情就是将函数分成几个步骤,然后使用 Process.join等待两个步骤在关键的同步点完成,就像@aix 的回答中提到的那样。

这比 time.sleep(10)好,因为你不能保证精确的时间。如果显式地等待,那么就意味着函数必须在移动到下一步之前执行该步骤,而不是假设它将在10毫秒内完成,而这并不能根据机器上发生的其他事情来保证。

如果你是一个 windows 用户并且使用 python 3,那么这篇文章将帮助你在 python 中进行并行编程。当你运行一个通常的多处理库的池编程时,你会得到一个关于你程序中的 main 函数的错误。这是因为 windows 没有 fork ()功能。下面的帖子给出了上述问题的解决方案。

Http://python.6.x6.nabble.com/multiprocessing-pool-woes-td5047050.html

因为我使用的是 python3,所以我对程序做了一些如下改动:

from types import FunctionType
import marshal


def _applicable(*args, **kwargs):
name = kwargs['__pw_name']
code = marshal.loads(kwargs['__pw_code'])
gbls = globals() #gbls = marshal.loads(kwargs['__pw_gbls'])
defs = marshal.loads(kwargs['__pw_defs'])
clsr = marshal.loads(kwargs['__pw_clsr'])
fdct = marshal.loads(kwargs['__pw_fdct'])
func = FunctionType(code, gbls, name, defs, clsr)
func.fdct = fdct
del kwargs['__pw_name']
del kwargs['__pw_code']
del kwargs['__pw_defs']
del kwargs['__pw_clsr']
del kwargs['__pw_fdct']
return func(*args, **kwargs)


def make_applicable(f, *args, **kwargs):
if not isinstance(f, FunctionType): raise ValueError('argument must be a function')
kwargs['__pw_name'] = f.__name__  # edited
kwargs['__pw_code'] = marshal.dumps(f.__code__)   # edited
kwargs['__pw_defs'] = marshal.dumps(f.__defaults__)  # edited
kwargs['__pw_clsr'] = marshal.dumps(f.__closure__)  # edited
kwargs['__pw_fdct'] = marshal.dumps(f.__dict__)   # edited
return _applicable, args, kwargs


def _mappable(x):
x,name,code,defs,clsr,fdct = x
code = marshal.loads(code)
gbls = globals() #gbls = marshal.loads(gbls)
defs = marshal.loads(defs)
clsr = marshal.loads(clsr)
fdct = marshal.loads(fdct)
func = FunctionType(code, gbls, name, defs, clsr)
func.fdct = fdct
return func(x)


def make_mappable(f, iterable):
if not isinstance(f, FunctionType): raise ValueError('argument must be a function')
name = f.__name__    # edited
code = marshal.dumps(f.__code__)   # edited
defs = marshal.dumps(f.__defaults__)  # edited
clsr = marshal.dumps(f.__closure__)  # edited
fdct = marshal.dumps(f.__dict__)  # edited
return _mappable, ((i,name,code,defs,clsr,fdct) for i in iterable)

在这个函数之后,上面的问题代码也像这样做了一些修改:

from multiprocessing import Pool
from poolable import make_applicable, make_mappable


def cube(x):
return x**3


if __name__ == "__main__":
pool    = Pool(processes=2)
results = [pool.apply_async(*make_applicable(cube,x)) for x in range(1,7)]
print([result.get(timeout=10) for result in results])

我得到的结果是:

[1, 8, 27, 64, 125, 216]

我认为这篇文章可能对一些 windows 用户有用。

这可以通过 完成,是一个允许您轻松并行化和分发 Python 代码的系统。

要并行化示例,需要使用 @ray.remote装饰符定义函数,然后使用 .remote调用它们。

import ray


ray.init()


dir1 = 'C:\\folder1'
dir2 = 'C:\\folder2'
filename = 'test.txt'
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45]


# Define the functions.
# You need to pass every global variable used by the function as an argument.
# This is needed because each remote function runs in a different process,
# and thus it does not have access to the global variables defined in
# the current process.
@ray.remote
def func1(filename, addFiles, dir):
# func1() code here...


@ray.remote
def func2(filename, addFiles, dir):
# func2() code here...


# Start two tasks in the background and wait for them to finish.
ray.get([func1.remote(filename, addFiles, dir1), func2.remote(filename, addFiles, dir2)])

如果向两个函数传递相同的参数且参数很大,则更有效的方法是使用 ray.put()。这样就避免了将大参数序列化两次并创建它的两个内存副本:

largeData_id = ray.put(largeData)


ray.get([func1(largeData_id), func2(largeData_id)])

重点 -如果 func1()func2()返回结果,则需要按以下方式重写代码:

ret_id1 = func1.remote(filename, addFiles, dir1)
ret_id2 = func2.remote(filename, addFiles, dir2)
ret1, ret2 = ray.get([ret_id1, ret_id2])

多重处理模块相比,使用 Ray 有许多优点。特别是,一样的密码将在一台机器上以及一组机器上运行。有关 Ray 的更多优点,请参见 这个相关的帖子

如果你的函数主要是做 I/O 工作(和较少的 CPU 工作) ,你有 Python 3.2 + ,你可以使用 线程池执行器:

from concurrent.futures import ThreadPoolExecutor


def run_io_tasks_in_parallel(tasks):
with ThreadPoolExecutor() as executor:
running_tasks = [executor.submit(task) for task in tasks]
for running_task in running_tasks:
running_task.result()


run_io_tasks_in_parallel([
lambda: print('IO task 1 running!'),
lambda: print('IO task 2 running!'),
])

如果你的函数主要是做 中央处理器工作(和较少的 I/O 工作) ,你有 Python 2.6 + ,你可以使用 多重处理模块:

from multiprocessing import Process


def run_cpu_tasks_in_parallel(tasks):
running_tasks = [Process(target=task) for task in tasks]
for running_task in running_tasks:
running_task.start()
for running_task in running_tasks:
running_task.join()


run_cpu_tasks_in_parallel([
lambda: print('CPU task 1 running!'),
lambda: print('CPU task 2 running!'),
])

似乎您有一个函数需要调用两个不同的参数。这可以通过使用 concurrent.futuresmap以及 Python 3.2 + 的组合来优雅地完成

import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor


def sleep_secs(seconds):
time.sleep(seconds)
print(f'{seconds} has been processed')


secs_list = [2,4, 6, 8, 10, 12]

现在,如果您的操作是 IO 绑定的,那么您可以这样使用 ThreadPoolExecutor:

with ThreadPoolExecutor() as executor:
results = executor.map(sleep_secs, secs_list)

注意这里如何使用 map将函数转换为参数列表。

现在,如果函数是 CPU 绑定的,那么可以使用 ProcessPoolExecutor

with ProcessPoolExecutor() as executor:
results = executor.map(sleep_secs, secs_list)

如果您不确定,您可以简单地同时尝试这两种方法,看看哪一种会给您带来更好的结果。

最后,如果你想把结果打印出来,你可以这样做:

with ThreadPoolExecutor() as executor:
results = executor.map(sleep_secs, secs_list)
for result in results:
print(result)

在2021年,最简单的方法是使用异步:

import asyncio, time


async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)


async def main():


task1 = asyncio.create_task(
say_after(4, 'hello'))


task2 = asyncio.create_task(
say_after(3, 'world'))


print(f"started at {time.strftime('%X')}")


# Wait until both tasks are completed (should take
# around 2 seconds.)
await task1
await task2


print(f"finished at {time.strftime('%X')}")




asyncio.run(main())

参考文献:

[1] https://docs.python.org/3/library/asyncio-task.html

(关于 如何在 python 中同时运行两个(或更多)函数?)

使用 asyncio,同步/异步任务可以通过以下方式并发运行:

import asyncio
import time


def function1():
# performing blocking tasks
while True:
print("function 1: blocking task ...")
time.sleep(1)


async def function2():
# perform non-blocking tasks
while True:
print("function 2: non-blocking task ...")
await asyncio.sleep(1)


async def main():
loop = asyncio.get_running_loop()


await asyncio.gather(
# https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor
loop.run_in_executor(None, function1),
function2(),
)


if __name__ == '__main__':
asyncio.run(main())