限制同时运行的最大线程数的正确方法?

我想创建一个程序,运行多个轻线程,但限制自己在一个恒定的,预定义数量的并发运行任务,像这样(但没有竞争条件的风险) :

import threading


def f(arg):
global running
running += 1
print("Spawned a thread. running=%s, arg=%s" % (running, arg))
for i in range(100000):
pass
running -= 1
print("Done")


running = 0
while True:
if running < 8:
arg = get_task()
threading.Thread(target=f, args=[arg]).start()

实现这一点的最安全/最快的方法是什么?

135475 次浏览

我见过最常见的写法是:

threads = [threading.Thread(target=f) for _ in range(8)]
for thread in threads:
thread.start()
...
for thread in threads:
thread.join()

如果您希望维护一个固定大小的运行线程池,这些线程处理短期任务,而不是请求新工作,那么可以考虑一个围绕 Queues 构建的解决方案,比如“ 如何等待,直到只有第一个线程在 Python 中完成”。

这听起来像是您想要用八个工作人员实现生产者/消费者模式。Python 为此有一个 Queue类,并且它是线程安全的。

每个工作者应该调用队列上的 get()来检索任务。如果没有任务可用,这个调用将会阻塞,导致工作者处于空闲状态,直到有一个任务可用。然后工作者应该执行任务,最后在队列上调用 task_done()

通过调用队列上的 put(),可以将任务放入队列中。

可以从主线程调用队列上的 join(),以等待所有挂起的任务完成。

这种方法的好处是不用创建和销毁线程,这样做代价高昂。工作线程将连续运行,但是当队列中没有任务时将处于睡眠状态,使用的 CPU 时间为零。

(链接的文档页面就有这种模式的一个例子。)

使用 multiprocessing.dummy.Poolconcurrent.futures.ThreadPoolExecutor(或者,如果使用 Python 2.x,则使用后端 futures)作为线程池或执行程序实现这一点要容易得多。例如:

import concurrent


def f(arg):
print("Started a task. running=%s, arg=%s" % (running, arg))
for i in range(100000):
pass
print("Done")


with concurrent.futures.ThreadPoolExecutor(8) as executor:
while True:
arg = get_task()
executor.submit(f, arg)

当然,如果你可以把拉式模型 get_task改成推式模型 get_tasks,例如,一次产生一个任务,这就更简单了:

with concurrent.futures.ThreadPoolExecutor(8) as executor:
for arg in get_tasks():
executor.submit(f, arg)

当你用完了任务(例如,get_task引发了一个异常,或者 get_tasks耗尽了) ,这将自动告诉执行器在它排干队列之后停止,等待它停止,并清理所有东西。

信号 是一种变量或抽象数据类型,用于控制并发系统(如多编程操作系统)中的多个进程对公共资源的访问; 这里可以提供帮助。

threadLimiter = threading.BoundedSemaphore(maximumNumberOfThreads)


class MyThread(threading.Thread):


def run(self):
threadLimiter.acquire()
try:
self.Executemycode()
finally:
threadLimiter.release()


def Executemycode(self):
print(" Hello World!")
# <your code here>

通过这种方式,您可以轻松地限制在程序执行期间并发执行的线程数。变量,“ maxumNumberOfThreads”可用于定义线程最大值的上限。

演职员表

为了在 线创建上应用 限制,请遵循这个例子 (它真的管用) :

import threading
import time




def some_process(thread_num):
count = 0
while count < 5:
time.sleep(0.5)
count += 1
print "%s: %s" % (thread_num, time.ctime(time.time()))
print 'number of alive threads:{}'.format(threading.active_count())




def create_thread():
try:
for i in range(1, 555):  # trying to spawn 555 threads.
thread = threading.Thread(target=some_process, args=(i,))
thread.start()


if threading.active_count() == 100:  # set maximum threads.
thread.join()


print threading.active_count()  # number of alive threads.


except Exception as e:
print "Error: unable to start thread {}".format(e)




if __name__ == '__main__':
create_thread()

或者:

另一种设置线程号检查器互斥锁/锁的方法,如下例所示:

import threading
import time




def some_process(thread_num):
count = 0
while count < 5:
time.sleep(0.5)
count += 1
# print "%s: %s" % (thread_num, time.ctime(time.time()))
print 'number of alive threads:{}'.format(threading.active_count())




def create_thread2(number_of_desire_thread ):
try:
for i in range(1, 555):
thread = threading.Thread(target=some_process, args=(i,)).start()


while number_of_desire_thread <= threading.active_count():
'''mutex for avoiding to additional thread creation.'''
pass


print 'unlock'
print threading.active_count()  # number of alive threads.


except Exception as e:
print "Error: unable to start thread {}".format(e)




if __name__ == '__main__':
create_thread2(100)

我遇到了同样的问题,花了几天时间(确切地说是两天)使用队列找到了正确的解决方案。我浪费了一天时间沿着 ThreadPoolExecator 路径前进,因为没有办法限制它启动的线程数量!我给它提供了一个5000个文件的列表来复制,一旦同时运行大约1500个并发文件副本,代码就没有响应了。ThreadPoolExecator 上的 max _ workers 参数只控制有多少 worker 在旋转线程,而不控制有多少线程被旋转。

好的,无论如何,这里有一个非常简单的使用 Queue 的例子:

import threading, time, random
from queue import Queue


jobs = Queue()


def do_stuff(q):
while not q.empty():
value = q.get()
time.sleep(random.randint(1, 10))
print(value)
q.task_done()


for i in range(10):
jobs.put(i)


for i in range(3):
worker = threading.Thread(target=do_stuff, args=(jobs,))
worker.start()


print("waiting for queue to complete", jobs.qsize(), "tasks")
jobs.join()
print("all done")

concurrent.futures.ThreadPoolExecutor.map

https://stackoverflow.com/a/19370282/895245中提到了 concurrent.futures.ThreadPoolExecutor,这里是 map方法的一个例子,它通常是最方便的方法。

.map()map()的一个并行版本: 它立即读取所有输入,然后并行运行任务,并以与输入相同的顺序返回。

用法:

./concurrent_map_exception.py [nproc [min [max]]

Concurrent _ map _ exception. py

import concurrent.futures
import sys
import time


def my_func(i):
time.sleep((abs(i) % 4) / 10.0)
return 10.0 / i


def my_get_work(min_, max_):
for i in range(min_, max_):
print('my_get_work: {}'.format(i))
yield i


# CLI.
argv_len = len(sys.argv)
if argv_len > 1:
nthreads = int(sys.argv[1])
if nthreads == 0:
nthreads = None
else:
nthreads = None
if argv_len > 2:
min_ = int(sys.argv[2])
else:
min_ = 1
if argv_len > 3:
max_ = int(sys.argv[3])
else:
max_ = 100


# Action.
with concurrent.futures.ProcessPoolExecutor(max_workers=nthreads) as executor:
for input, output in zip(
my_get_work(min_, max_),
executor.map(my_func, my_get_work(min_, max_))
):
print('result: {} {}'.format(input, output))

GitHub 上游。

例如:

./concurrent_map_exception.py 1 1 5

提供:

my_get_work: 1
my_get_work: 2
my_get_work: 3
my_get_work: 4
my_get_work: 1
result: 1 10.0
my_get_work: 2
result: 2 5.0
my_get_work: 3
result: 3 3.3333333333333335
my_get_work: 4
result: 4 2.5

以及:

./concurrent_map_exception.py 2 1 5

给出相同的输出,但运行速度更快,因为我们现在有两个进程,并且:

./concurrent_map_exception.py 1 -5 5

提供:

my_get_work: -5
my_get_work: -4
my_get_work: -3
my_get_work: -2
my_get_work: -1
my_get_work: 0
my_get_work: 1
my_get_work: 2
my_get_work: 3
my_get_work: 4
my_get_work: -5
result: -5 -2.0
my_get_work: -4
result: -4 -2.5
my_get_work: -3
result: -3 -3.3333333333333335
my_get_work: -2
result: -2 -5.0
my_get_work: -1
result: -1 -10.0
my_get_work: 0
concurrent.futures.process._RemoteTraceback:
"""
Traceback (most recent call last):
File "/usr/lib/python3.6/concurrent/futures/process.py", line 175, in _process_worker
r = call_item.fn(*call_item.args, **call_item.kwargs)
File "/usr/lib/python3.6/concurrent/futures/process.py", line 153, in _process_chunk
return [fn(*args) for args in chunk]
File "/usr/lib/python3.6/concurrent/futures/process.py", line 153, in <listcomp>
return [fn(*args) for args in chunk]
File "./concurrent_map_exception.py", line 24, in my_func
return 10.0 / i
ZeroDivisionError: float division by zero
"""


The above exception was the direct cause of the following exception:


Traceback (most recent call last):
File "./concurrent_map_exception.py", line 52, in <module>
executor.map(my_func, my_get_work(min_, max_))
File "/usr/lib/python3.6/concurrent/futures/process.py", line 366, in _chain_from_iterable_of_lists
for element in iterable:
File "/usr/lib/python3.6/concurrent/futures/_base.py", line 586, in result_iterator
yield fs.pop().result()
File "/usr/lib/python3.6/concurrent/futures/_base.py", line 432, in result
return self.__get_result()
File "/usr/lib/python3.6/concurrent/futures/_base.py", line 384, in __get_result
raise self._exception
ZeroDivisionError: float division by zero

因此,请注意它是如何立即在异常上停止的。

带错误处理的 Queue示例

https://stackoverflow.com/a/19369877/895245中提到了 Queue,但这里是一个完整的例子。

设计目标:

  • 输入函数不需要修改
  • 限制线程的数量
  • 队列大小与线程数量密切相关
  • 只在需要时获取输入,而不是预先获取所有内容
  • 如果发生错误,可选择随后立即停止
  • 是一个异常引发的辅助函数,清楚地显示堆栈跟踪

concurrent.futures.ThreadPoolExecutor是我所见过的 stdlib 中目前可用的最好的接口。然而,我找不到如何做到以下所有事情:

  • 使它完美地一点一点地输入
  • 一旦出错立即失败
  • 接受具有多个参数的函数

因为:

  • .map(): 一次读取所有输入,而 func只能接受参数
  • .submit(): .shutdown()执行直到所有期货完成,并且对最大当前工作项目没有阻塞 .submit()。那么,如何避免丑陋的 .cancel()循环后,第一次失败的所有期货?

废话不多说,下面是我的实现:

Thread _ pool. py

#!/usr/bin/env python3


'''
This file is MIT Licensed because I'm posting it on Stack Overflow:
https://stackoverflow.com/questions/19369724/the-right-way-to-limit-maximum-number-of-threads-running-at-once/55263676#55263676
'''


from typing import Any, Callable, Dict, Iterable, Union
import os
import queue
import sys
import threading
import time
import traceback


class ThreadPoolExitException(Exception):
'''
An object of this class may be raised by output_handler_function to
request early termination.


It is also raised by submit() if submit_raise_exit=True.
'''
pass


class ThreadPool:
'''
Start a pool of a limited number of threads to do some work.


This is similar to the stdlib concurrent, but I could not find
how to reach all my design goals with that implementation:


* the input function does not need to be modified
* limit the number of threads
* queue sizes closely follow number of threads
* if an exception happens, optionally stop soon afterwards


This class form allows to use your own while loops with submit().


Exit soon after the first failure happens:


....
python3 thread_pool.py 2 -10 20 handle_output_print
....


Sample output:


....
{'i': -9} -1.1111111111111112 None
{'i': -8} -1.25 None
{'i': -10} -1.0 None
{'i': -6} -1.6666666666666667 None
{'i': -7} -1.4285714285714286 None
{'i': -4} -2.5 None
{'i': -5} -2.0 None
{'i': -2} -5.0 None
{'i': -3} -3.3333333333333335 None
{'i': 0} None ZeroDivisionError('float division by zero')
{'i': -1} -10.0 None
{'i': 1} 10.0 None
{'i': 2} 5.0 None
work_function or handle_output raised:
Traceback (most recent call last):
File "thread_pool.py", line 181, in _func_runner
work_function_return = self.work_function(**work_function_input)
File "thread_pool.py", line 281, in work_function_maybe_raise
return 10.0 / i
ZeroDivisionError: float division by zero
work_function_input: {'i': 0}
work_function_return: None
....


Don't exit after first failure, run until end:


....
python3 thread_pool.py 2 -10 20 handle_output_print_no_exit
....


Store results in a queue for later inspection instead of printing immediately,
then print everything at the end:


....
python3 thread_pool.py 2 -10 20 handle_output_queue
....


Exit soon after the handle_output raise.


....
python3 thread_pool.py 2 -10 20 handle_output_raise
....


Relying on this interface to abort execution is discouraged, this should
usually only happen due to a programming error in the handler.


Test that the argument called "thread_id" is passed to work_function and printed:


....
python3 thread_pool.py 2 -10 20 handle_output_print thread_id
....


Test with, ThreadPoolExitException and submit_raise_exit=True, same behaviour handle_output_print
except for the different exit cause report:


....
python3 thread_pool.py 2 -10 20 handle_output_raise_exit_exception
....
'''
def __init__(
self,
work_function: Callable,
handle_output: Union[Callable[[Any,Any,Exception],Any],None] = None,
nthreads: Union[int,None] = None,
thread_id_arg: Union[str,None] = None,
submit_raise_exit: bool = False
):
'''
Start in a thread pool immediately.


join() must be called afterwards at some point.


:param work_function: main work function to be evaluated.
:param handle_output: called on work_function return values as they
are returned.


The function signature is:


....
handle_output(
work_function_input: Union[Dict,None],
work_function_return,
work_function_exception: Exception
) -> Union[Exception,None]
....


where work_function_exception the exception that work_function raised,
or None otherwise


The first non-None return value of a call to this function is returned by
submit(), get_handle_output_result() and join().


The intended semantic for this, is to return:


*   on success:
** None to continue execution
** ThreadPoolExitException() to request stop execution
* if work_function_input or work_function_exception raise:
** the exception raised


The ThreadPool user can then optionally terminate execution early on error
or request with either:


* an explicit submit() return value check + break if a submit loop is used
* `with` + submit_raise_exit=True


Default: a handler that just returns `exception`, which can normally be used
by the submit loop to detect an error and exit immediately.
:param nthreads: number of threads to use. Default: nproc.
:param thread_id_arg: if not None, set the argument of work_function with this name
to a 0-indexed thread ID. This allows function calls to coordinate
usage of external resources such as files or ports.
:param submit_raise_exit: if True, submit() raises ThreadPoolExitException() if
get_handle_output_result() is not None.
'''
self.work_function = work_function
if handle_output is None:
handle_output = lambda input, output, exception: exception
self.handle_output = handle_output
if nthreads is None:
nthreads = len(os.sched_getaffinity(0))
self.thread_id_arg = thread_id_arg
self.submit_raise_exit = submit_raise_exit
self.nthreads = nthreads
self.handle_output_result = None
self.handle_output_result_lock = threading.Lock()
self.in_queue = queue.Queue(maxsize=nthreads)
self.threads = []
for i in range(self.nthreads):
thread = threading.Thread(
target=self._func_runner,
args=(i,)
)
self.threads.append(thread)
thread.start()


def __enter__(self):
'''
__exit__ automatically calls join() for you.


This is cool because it automatically ends the loop if an exception occurs.


But don't forget that errors may happen after the last submit was called, so you
likely want to check for that with get_handle_output_result() after the with.
'''
return self


def __exit__(self, exception_type, exception_value, exception_traceback):
self.join()
return exception_type is ThreadPoolExitException


def _func_runner(self, thread_id):
while True:
work_function_input = self.in_queue.get(block=True)
if work_function_input is None:
break
if self.thread_id_arg is not None:
work_function_input[self.thread_id_arg] = thread_id
try:
work_function_exception = None
work_function_return = self.work_function(**work_function_input)
except Exception as e:
work_function_exception = e
work_function_return = None
handle_output_exception = None
try:
handle_output_return = self.handle_output(
work_function_input,
work_function_return,
work_function_exception
)
except Exception as e:
handle_output_exception = e
handle_output_result = None
if handle_output_exception is not None:
handle_output_result = handle_output_exception
elif handle_output_return is not None:
handle_output_result = handle_output_return
if handle_output_result is not None and self.handle_output_result is None:
with self.handle_output_result_lock:
self.handle_output_result = (
work_function_input,
work_function_return,
handle_output_result
)
self.in_queue.task_done()


@staticmethod
def exception_traceback_string(exception):
'''
Helper to get the traceback from an exception object.
This is usually what you want to print if an error happens in a thread:
https://stackoverflow.com/questions/3702675/how-to-print-the-full-traceback-without-halting-the-program/56199295#56199295
'''
return ''.join(traceback.format_exception(
None, exception, exception.__traceback__)
)


def get_handle_output_result(self):
'''
:return: if a handle_output call has raised previously, return a tuple:


....
(work_function_input, work_function_return, exception_raised)
....


corresponding to the first such raise.


Otherwise, if a handle_output returned non-None, a tuple:


(work_function_input, work_function_return, handle_output_return)


Otherwise, None.
'''
return self.handle_output_result


def join(self):
'''
Request all threads to stop after they finish currently submitted work.


:return: same as get_handle_output_result()
'''
for thread in range(self.nthreads):
self.in_queue.put(None)
for thread in self.threads:
thread.join()
return self.get_handle_output_result()


def submit(
self,
work_function_input: Union[Dict,None] =None
):
'''
Submit work. Block if there is already enough work scheduled (~nthreads).


:return: the same as get_handle_output_result
'''
handle_output_result = self.get_handle_output_result()
if handle_output_result is not None and self.submit_raise_exit:
raise ThreadPoolExitException()
if work_function_input is None:
work_function_input = {}
self.in_queue.put(work_function_input)
return handle_output_result


if __name__ == '__main__':
def get_work(min_, max_):
'''
Generate simple range work for work_function.
'''
for i in range(min_, max_):
yield {'i': i}


def work_function_maybe_raise(i):
'''
The main function that will be evaluated.


It sleeps to simulate an IO operation.
'''
time.sleep((abs(i) % 4) / 10.0)
return 10.0 / i


def work_function_get_thread(i, thread_id):
time.sleep((abs(i) % 4) / 10.0)
return thread_id


def handle_output_print(input, output, exception):
'''
Print outputs and exit immediately on failure.
'''
print('{!r} {!r} {!r}'.format(input, output, exception))
return exception


def handle_output_print_no_exit(input, output, exception):
'''
Print outputs, don't exit on failure.
'''
print('{!r} {!r} {!r}'.format(input, output, exception))


out_queue = queue.Queue()
def handle_output_queue(input, output, exception):
'''
Store outputs in a queue for later usage.
'''
global out_queue
out_queue.put((input, output, exception))
return exception


def handle_output_raise(input, output, exception):
'''
Raise if input == 0, to test that execution
stops nicely if this raises.
'''
print('{!r} {!r} {!r}'.format(input, output, exception))
if input['i'] == 0:
raise Exception


def handle_output_raise_exit_exception(input, output, exception):
'''
Return a ThreadPoolExitException() if input == -5.
Return the work_function exception if it raised.
'''
print('{!r} {!r} {!r}'.format(input, output, exception))
if exception:
return exception
if output == 10.0 / -5:
return ThreadPoolExitException()


# CLI arguments.
argv_len = len(sys.argv)
if argv_len > 1:
nthreads = int(sys.argv[1])
if nthreads == 0:
nthreads = None
else:
nthreads = None
if argv_len > 2:
min_ = int(sys.argv[2])
else:
min_ = 1
if argv_len > 3:
max_ = int(sys.argv[3])
else:
max_ = 100
if argv_len > 4:
handle_output_funtion_string = sys.argv[4]
else:
handle_output_funtion_string = 'handle_output_print'
handle_output = eval(handle_output_funtion_string)
if argv_len > 5:
work_function = work_function_get_thread
thread_id_arg = sys.argv[5]
else:
work_function = work_function_maybe_raise
thread_id_arg = None


# Action.
if handle_output is handle_output_raise_exit_exception:
# `with` version with implicit join and submit raise
# immediately when desired with ThreadPoolExitException.
#
# This is the more safe and convenient and DRY usage if
# you can use `with`, so prefer it generally.
with ThreadPool(
work_function,
handle_output,
nthreads,
thread_id_arg,
submit_raise_exit=True
) as my_thread_pool:
for work in get_work(min_, max_):
my_thread_pool.submit(work)
handle_output_result = my_thread_pool.get_handle_output_result()
else:
# Explicit error checking in submit loop to exit immediately
# on error.
my_thread_pool = ThreadPool(
work_function,
handle_output,
nthreads,
thread_id_arg,
)
for work_function_input in get_work(min_, max_):
handle_output_result = my_thread_pool.submit(work_function_input)
if handle_output_result is not None:
break
handle_output_result = my_thread_pool.join()
if handle_output_result is not None:
work_function_input, work_function_return, exception = handle_output_result
if type(exception) is ThreadPoolExitException:
print('Early exit requested by handle_output with ThreadPoolExitException:')
else:
print('work_function or handle_output raised:')
print(ThreadPool.exception_traceback_string(exception), end='')
print('work_function_input: {!r}'.format(work_function_input))
print('work_function_return: {!r}'.format(work_function_return))
if handle_output == handle_output_queue:
while not out_queue.empty():
print(out_queue.get())

GitHub 上游。

在 Python 3.7.3中测试。

这可以用 信号对象来完成。信号量管理一个内部计数器,该计数器被每个 acquire()调用递减,并被每个 release()调用递增。计数器永远不能低于零; 当 acquire()发现它为零时,它会阻塞,直到其他线程调用 release()

一个简短的示例显示了最多5个并行线程,其中一半线程立即执行,另一半线程被阻塞并等待:

import threading
import time


maxthreads = 5
pool_sema = threading.Semaphore(value=maxthreads)
threads = list()


def task(i):
pool_sema.acquire()
try:
print("executed {}. thread".format(i))
time.sleep(2)
except Exception as e:
print("Error: problem with {0}. thread.\nMessage:{1}".format(i, e))
finally:
pool_sema.release()


def create_threads(number_of_threads):
try:
for i in range(number_of_threads):
thread = threading.Thread(target=task,args=(str(i)))
threads.append(thread)
thread.start()
except Exception as e:
print("Error: unable to start thread {}".format(e))


if __name__ == '__main__':
create_threads(10)

输出

executed 0. thread
executed 1. thread
executed 2. thread
executed 3. thread
executed 4. thread
executed 5. thread
executed 6. thread
executed 7. thread
executed 8. thread
executed 9. thread

对于那些喜欢基于输入列表使用 列表内涵的用户:

import threading
import time


maxthreads = 5
pool_sema = threading.Semaphore(value=maxthreads)


def task(i):
pool_sema.acquire()
try:
print("executed {}. thread".format(i))
time.sleep(2)
except Exception as e:
print("Error: problem with {0}. thread.\nMessage:{1}".format(i, e))
finally:
pool_sema.release()


def create_threads(number_of_threads):
try:
threads = [threading.Thread(target=task, args=(str(i))) for i in range(number_of_threads)]
[t.start() for t in threads]
except Exception as e:
print("Error: unable to start thread {}".format(e))
finally:
[t.join() for t in threads]


if __name__ == '__main__':
create_threads(10)

使用 threading.activeCount ()方法限制 max 线程的简单方法

import threading, time


maxthreads = 10


def do_stuff(i):
print(i)
print("Total Active threads are {0}".format(threading.activeCount()))
time.sleep(20)


count = 0
while True:
if threading.activeCount() <= maxthreads:
worker = threading.Thread(target=do_stuff, args=(count,))
worker.start()
count += 1

它可以很容易地实现使用 ThreadPoolExecutor。改变使用 max_workers参数的限制。

from concurrent.futures import ThreadPoolExecutor
import time


pool = ThreadPoolExecutor(max_workers=10)




def thread(num):
print(num)
time.sleep(3)




for n in range(0, 1000):
pool.submit(thread, n)


pool.shutdown(wait=True)

使用下面的 Python 代码

import threading
import time


# Set this global variable to maximum number of threads you want to execute
MAX_THREAD_COUNT = 30


#Sample program which will run in thread. This is place holder. Add your code to this program, to be executed in the thread
def sampleThreadProgram():
time.sleep(1)


#main program
def main():
#Sample loop, replace it with your loop to execute threads. Currently it
#will execute 1000 times the above placeholder function in multiple
#threads, in parallel of MAX_THREAD_COUNT threads


for x in range(1000):
print('x is ' + str(x))
        

#Sleep and wait for 1 second and wait while some of the running threads
#in MAX_THREAD_COUNT range are completed before raising new threads
while len(threading.enumerate())>=MAX_THREAD_COUNT:
print('running threads are' + str(len(threading.enumerate())))
time.sleep(1)
        

# Start new thread since now number of threads are less than MAX_THREAD_COUNT
t1 = threading.Thread(target=sampleThreadProgram, args=())
t1.start()


main()