Python 多线程等待所有线程完成

这个问题可能在类似的情况下被问过,但是我在大约20分钟的搜索之后没有找到答案,所以我会问。

我已经编写了一个 Python 脚本(例如: scriptA.py)和一个脚本(例如 scriptB.py)

在 scriptB 中,我想用不同的参数多次调用 scriptA,每次运行大约需要一个小时,(这是一个巨大的脚本,做很多事情。.别担心) ,我希望能够同时使用所有不同的参数运行 scriptA,但是我需要等到所有参数都运行完毕之后才能继续; 我的代码:

import subprocess


#setup
do_setup()


#run scriptA
subprocess.call(scriptA + argumentsA)
subprocess.call(scriptA + argumentsB)
subprocess.call(scriptA + argumentsC)


#finish
do_finish()

我想做运行所有的 subprocess.call()在同一时间,然后等待,直到他们都完成了,我应该如何做到这一点?

我尝试使用类似于 给你示例的线程:

from threading import Thread
import subprocess


def call_script(args)
subprocess.call(args)


#run scriptA
t1 = Thread(target=call_script, args=(scriptA + argumentsA))
t2 = Thread(target=call_script, args=(scriptA + argumentsB))
t3 = Thread(target=call_script, args=(scriptA + argumentsC))
t1.start()
t2.start()
t3.start()

但我不认为这是正确的。

我怎么知道他们都已经完成运行之前,我的 do_finish()

297027 次浏览

您需要在脚本的末尾使用 Thread对象的 加入方法。

t1 = Thread(target=call_script, args=(scriptA + argumentsA))
t2 = Thread(target=call_script, args=(scriptA + argumentsB))
t3 = Thread(target=call_script, args=(scriptA + argumentsC))


t1.start()
t2.start()
t3.start()


t1.join()
t2.join()
t3.join()

因此,主线程将等待到 t1t2t3完成执行。

将线程放在一个列表中,然后使用 连接方法

 threads = []


t = Thread(...)
threads.append(t)


...repeat as often as necessary...


# Start all threads
for x in threads:
x.start()


# Wait for all of them to finish
for x in threads:
x.join()

你可以拥有一个类,比如下面这个类,你可以在其中添加 n 个函数或者你想要并行执行的 sole _ script,然后开始执行,等待所有的任务完成。.

from multiprocessing import Process


class ProcessParallel(object):
"""
To Process the  functions parallely


"""
def __init__(self, *jobs):
"""
"""
self.jobs = jobs
self.processes = []


def fork_processes(self):
"""
Creates the process objects for given function deligates
"""
for job in self.jobs:
proc  = Process(target=job)
self.processes.append(proc)


def start_all(self):
"""
Starts the functions process all together.
"""
for proc in self.processes:
proc.start()


def join_all(self):
"""
Waits untill all the functions executed.
"""
for proc in self.processes:
proc.join()




def two_sum(a=2, b=2):
return a + b


def multiply(a=2, b=2):
return a * b




#How to run:
if __name__ == '__main__':
#note: two_sum, multiply can be replace with any python console scripts which
#you wanted to run parallel..
procs =  ProcessParallel(two_sum, multiply)
#Add all the process in list
procs.fork_processes()
#starts  process execution
procs.start_all()
#wait until all the process got executed
procs.join_all()

我更喜欢根据输入列表使用列表内涵:

inputs = [scriptA + argumentsA, scriptA + argumentsB, ...]
threads = [Thread(target=call_script, args=(i)) for i in inputs]
[t.start() for t in threads]
[t.join() for t in threads]

在 Python 3中,由于 Python 3.2有一种新的方法可以达到相同的结果,我个人更喜欢传统的线程创建/开始/联接包 concurrent.futures: https://docs.python.org/3/library/concurrent.futures.html

使用 ThreadPoolExecutor的代码是:

from concurrent.futures.thread import ThreadPoolExecutor
import time


def call_script(ordinal, arg):
print('Thread', ordinal, 'argument:', arg)
time.sleep(2)
print('Thread', ordinal, 'Finished')


args = ['argumentsA', 'argumentsB', 'argumentsC']


with ThreadPoolExecutor(max_workers=2) as executor:
ordinal = 1
for arg in args:
executor.submit(call_script, ordinal, arg)
ordinal += 1
print('All tasks has been finished')

前面代码的输出类似于:

Thread 1 argument: argumentsA
Thread 2 argument: argumentsB
Thread 1 Finished
Thread 2 Finished
Thread 3 argument: argumentsC
Thread 3 Finished
All tasks has been finished

其优点之一是,您可以控制设置最大并发工作者的吞吐量。

也许吧

for t in threading.enumerate():
if t.daemon:
t.join()

我刚刚遇到了同样的问题,我需要等待所有使用 for 循环创建的线程。我只是尝试了下面的代码片段。可能不是完美的解决方案,但我认为这将是一个简单的解决方案来测试:

for t in threading.enumerate():
try:
t.join()
except RuntimeError as err:
if 'cannot join current thread' in err:
continue
else:
raise

threading 模块文档

有一个“主线程”对象; 它对应于初始 控制线程。它不是守护进程线程。

有可能创建“虚拟线程对象”。 这些线程对象对应于“异形线程”,即 控制线程在线程模块之外启动,例如 虚线程对象功能有限; 他们总是被认为是活的和魔鬼,不能被 join()。 它们永远不会被删除,因为不可能检测到 外星线的终结。

因此,当您不想保留所创建的线程的列表时,要捕获这两种情况:

import threading as thrd




def alter_data(data, index):
data[index] *= 2




data = [0, 2, 6, 20]


for i, value in enumerate(data):
thrd.Thread(target=alter_data, args=[data, i]).start()


for thread in thrd.enumerate():
if thread.daemon:
continue
try:
thread.join()
except RuntimeError as err:
if 'cannot join current thread' in err.args[0]:
# catchs main thread
continue
else:
raise

因此:

>>> print(data)
[0, 4, 12, 40]

使用只连接可以导致 假阳性与线程的交互,就像文档中说的:

当超时参数存在而非 Nothing 时,它应该是一个 中指定操作超时的浮点数 由于 join ()总是返回 Nothing,因此 在 join ()之后必须调用 isAlive ()来决定是否发生了超时 - 如果线程仍然活动,join ()调用将超时。

一段说明性的代码:

threads = []
for name in some_data:
new = threading.Thread(
target=self.some_func,
args=(name,)
)
threads.append(new)
new.start()
    

over_threads = iter(threads)
curr_th = next(over_threads)
while True:
curr_th.join()
if curr_th.is_alive():
continue
try:
curr_th = next(over_threads)
except StopIteration:
break