如何在 Python 中分析多线程程序?

我正在用 Python 开发一个固有的多线程模块,我想知道它在哪里花费时间。CProfile 似乎只对主线程进行概要分析。是否有办法分析计算中涉及的所有线程?

30324 次浏览

我不知道有什么分析应用程序支持 python 这样的东西,但是你可以编写一个 Trace 类来写日志文件,在这里你可以输入一个操作何时开始、何时结束以及它消耗了多少时间的信息。

这是解决你问题的一个简单快捷的方法。

可以不运行一个 cProfile,而是在每个线程中运行单独的 cProfile实例,然后组合这些统计信息。Stats.add()会自动执行此操作。

如果您愿意做一些额外的工作,那么可以编写自己的实现 profile(self, frame, event, arg)的分析类。每当调用一个函数时就会调用它,您可以相当容易地设置一个结构来从中收集统计信息。

然后可以使用 threading.setprofile在每个线程上注册该函数。当函数被调用时,您可以使用 threading.currentThread()查看它在哪个函数上运行。更多信息(以及即时运行的配方)请点击这里:

Http://code.activestate.com/recipes/465831/

Http://docs.python.org/library/threading.html#threading.setprofile

鉴于不同线程的主要功能不同,您可以使用来自 给你的非常有用的 profile_func()装饰器。

请参阅 Yappi(另一个 Python 分析器)。

从2019年开始: 我喜欢 vartec 的建议,但更喜欢一个代码示例。因此,我建立了一个-这不是疯狂的难以实现,但你确实需要考虑到一些事情。下面是一个工作示例(Python 3.6) :

您可以看到,结果考虑了 Thread1和 thread2调用 thread _ func ()所花费的时间。

您需要在代码中进行的唯一更改是子类线程化。线程,重写其 run ()方法。简化线程分析方法的最小更改。

import threading
import cProfile
from time import sleep
from pstats import Stats
import pstats
from time import time
import threading
import sys


# using different times to ensure the results reflect all threads
SHORT = 0.5
MED = 0.715874
T1_SLEEP = 1.37897
T2_SLEEP = 2.05746
ITER = 1
ITER_T = 4


class MyThreading(threading.Thread):
""" Subclass to arrange for the profiler to run in the thread """
def run(self):
""" Here we simply wrap the call to self._target (the callable passed as arg to MyThreading(target=....) so that cProfile runs it for us, and thus is able to profile it.
Since we're in the current instance of each threading object at this point, we can run arbitrary number of threads & profile all of them
"""
try:
if self._target:
# using the name attr. of our thread to ensure unique profile filenames
cProfile.runctx('self._target(*self._args, **self._kwargs)', globals=globals(), locals=locals(), filename= f'full_server_thread_{self.name}')
finally:
# Avoid a refcycle if the thread is running a function with
# an argument that has a member that points to the thread.
del self._target, self._args, self._kwargs


def main(args):
""" Main func. """
thread1_done =threading.Event()
thread1_done.clear()
thread2_done =threading.Event()
thread2_done.clear()


print("Main thread start.... ")
t1 = MyThreading(target=thread_1, args=(thread1_done,), name="T1" )
t2 = MyThreading(target=thread_2, args=(thread2_done,), name="T2" )
print("Subthreads instances.... launching.")


t1.start()          # start will call our overrident threading.run() method
t2.start()


for i in range(0,ITER):
print(f"MAIN iteration: {i}")
main_func_SHORT()
main_func_MED()


if thread1_done.wait() and thread2_done.wait():
print("Threads are done now... ")
return True


def main_func_SHORT():
""" Func. called by the main T """
sleep(SHORT)
return True


def main_func_MED():
sleep(MED)
return True




def thread_1(done_flag):
print("subthread target func 1 ")
for i in range(0,ITER_T):
thread_func(T1_SLEEP)
done_flag.set()


def thread_func(SLEEP):
print(f"Thread func")
sleep(SLEEP)


def thread_2(done_flag):
print("subthread target func 2 ")
for i in range(0,ITER_T):
thread_func(T2_SLEEP)
done_flag.set()


if __name__ == '__main__':


import sys
args = sys.argv[1:]
cProfile.run('main(args)', f'full_server_profile')
stats = Stats('full_server_profile')
stats.add('full_server_thread_T1')
stats.add('full_server_thread_T2')
stats.sort_stats('filename').print_stats()

查看 Dask 项目中的 mtprof:

Https://github.com/dask/mtprof

它是 cProfile的一个下拉式替代品,如果您的线程以通常的方式启动,并在主线程之前完成,它将把它们的统计数据卷入相同的报告统计数据中。对我来说非常有效。