有没有办法杀死一个线程?

是否可以在不设置/检查任何标志/信号量等的情况下终止正在运行的线程?

1111394 次浏览

你永远不应该强行杀死一个线程而不与它合作。

杀死线程会删除任何尝试/最终阻止设置的保证,因此您可能会锁定锁、打开文件等。

你唯一可以争辩说强行杀死线程是个好主意的时候是快速杀死程序,但绝不是单线程。

没有官方API可以做到这一点,没有。

您需要使用平台API来杀死线程,例如pthread_kill或TerminateThread。您可以访问此类API,例如通过pythonwin或通过ctype。

请注意,这本质上是不安全的。如果被杀死的线程在被杀死时具有GIL,它可能会导致无法收集的垃圾(来自成为垃圾的堆栈帧的局部变量),并可能导致死锁。

最好不要杀死一个线程。一种方法可以是在线程的循环中引入一个“try”块,并在您想停止线程时抛出异常(例如,停止您的for/这时候/…)。我已经在我的应用程序上使用了这个,它的工作原理…

您可以通过将跟踪安装到将退出线程的线程中来终止线程。有关一种可能的实现,请参阅附加链接。

在Python中杀死一个线程

在Python和任何语言中突然终止线程通常是一种糟糕的模式。想想以下情况:

  • 线程持有必须正确关闭的关键资源
  • 该线程还创建了其他几个必须杀死的线程。

如果你能负担得起(如果你管理自己的线程),处理这个问题的好方法是有一个exit_request标志,每个线程定期检查,看看是否是时候退出了。

例如:

import threading
class StoppableThread(threading.Thread):"""Thread class with a stop() method. The thread itself has to checkregularly for the stopped() condition."""
def __init__(self,  *args, **kwargs):super(StoppableThread, self).__init__(*args, **kwargs)self._stop_event = threading.Event()
def stop(self):self._stop_event.set()
def stopped(self):return self._stop_event.is_set()

在这段代码中,当您希望线程退出时,您应该在线程上调用stop(),并使用join()等待线程正确退出。线程应该定期检查停止标志。

然而,在某些情况下,您确实需要终止一个线程。一个例子是,当您包装一个忙于长时间调用的外部库时,您想中断它。

以下代码允许(有一些限制)在Python线程中引发Exception:

def _async_raise(tid, exctype):'''Raises an exception in the threads with id tid'''if not inspect.isclass(exctype):raise TypeError("Only types can be raised (not instances)")res = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid),ctypes.py_object(exctype))if res == 0:raise ValueError("invalid thread id")elif res != 1:# "if it returns a number greater than one, you're in trouble,# and you should call it again with exc=NULL to revert the effect"ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), None)raise SystemError("PyThreadState_SetAsyncExc failed")
class ThreadWithExc(threading.Thread):'''A thread class that supports raising an exception in the thread fromanother thread.'''def _get_my_tid(self):"""determines this (self's) thread id
CAREFUL: this function is executed in the context of the callerthread, to get the identity of the thread represented by thisinstance."""if not self.isAlive():raise threading.ThreadError("the thread is not active")
# do we have it cached?if hasattr(self, "_thread_id"):return self._thread_id
# no, look for it in the _active dictfor tid, tobj in threading._active.items():if tobj is self:self._thread_id = tidreturn tid
# TODO: in python 2.6, there's a simpler way to do: self.ident
raise AssertionError("could not determine the thread's id")
def raiseExc(self, exctype):"""Raises the given exception type in the context of this thread.
If the thread is busy in a system call (time.sleep(),socket.accept(), ...), the exception is simply ignored.
If you are sure that your exception should terminate the thread,one way to ensure that it works is:
t = ThreadWithExc( ... )...t.raiseExc( SomeException )while t.isAlive():time.sleep( 0.1 )t.raiseExc( SomeException )
If the exception is to be caught by the thread, you need a way tocheck that your thread has caught it.
CAREFUL: this function is executed in the context of thecaller thread, to raise an exception in the context of thethread represented by this instance."""_async_raise( self._get_my_tid(), exctype )

(基于Tomer Filiba的可杀死的线程。关于PyThreadState_SetAsyncExc返回值的引用似乎来自旧版本的Python。)

正如留档中所述,这不是灵丹妙药,因为如果线程在Python解释器之外忙碌,它将不会捕获中断。

此代码的一个良好使用模式是让线程捕获特定异常并执行清理。这样,您就可以中断任务并仍然进行适当的清理。

如果您试图终止整个程序,您可以将线程设置为“守护进程”。参见Thread.daemon

这是一个糟糕的答案,看看评论

这里是如何做到这一点:

from threading import *
...
for thread in enumerate():if thread.isAlive():try:thread._Thread__stop()except:print(str(thread.getName()) + ' could not be terminated'))

给它几秒钟,然后你的线程应该停止。还要检查thread._Thread__delete()方法。

为了方便起见,我建议使用thread.quit()方法。例如,如果您的线程中有一个套接字,我建议在套接字句柄类中创建quit()方法,终止套接字,然后在quit()中运行thread._Thread__stop()

Amultiprocessing.Process可以p.terminate()

在我想杀死一个线程但不想使用标志/锁/信号/信号量/事件/任何东西的情况下,我将线程提升到成熟的进程。对于只使用几个线程的代码,开销没有那么糟糕。

例如。这可以方便地终止执行阻塞I/O的辅助“线程”

转换很简单:在相关代码中,将所有threading.Thread替换为multiprocessing.Process,将所有queue.Queue替换为multiprocessing.Queue,并将p.terminate()所需的调用添加到想要杀死其子进程p的父进程

Python留档#0

示例:

import multiprocessingproc = multiprocessing.Process(target=your_proc_function, args=())proc.start()# Terminate the processproc.terminate()  # sends a SIGTERM

在Python中,你不能直接杀死一个线程。

如果你真的不需要一个线程(!),你可以做的,而不是使用线程,是使用多重处理。在这里,要杀死一个进程,您可以简单地调用方法:

yourProcess.terminate()  # kill the process!

Python会杀死你的进程(在Unix上通过SIGTERM信号,而在Windows上通过TerminateProcess()调用)。在使用Queue或Pipe时注意使用它!(它可能会损坏Queue/Pipe中的数据)

请注意,multiprocessing.Eventmultiprocessing.Semaphore的工作方式分别与threading.Eventthreading.Semaphore完全相同。事实上,第一个是后者的克隆。

如果你真的需要使用一个线程,没有办法直接杀死它。然而,你可以做的是使用"守护进程线程"。事实上,在Python中,一个线程可以被标记为守护进程

yourThread.daemon = True  # set the Thread as a "daemon thread"

当没有活的非守护线程时,主程序将退出。换句话说,当您的主线程(当然是非守护线程)将完成其操作时,即使仍有一些守护线程在工作,程序也将退出。

请注意,在调用start()方法之前,有必要将Thread设置为daemon

当然,您可以并且应该使用daemon,即使是multiprocessing。在这里,当主进程退出时,它会尝试终止其所有守护进程子进程。

最后,请注意,sys.exit()os.kill()不是选项。

这是基于thread2--可杀死的线程 ActiveState配方。

您需要调用PyThreadState_SetAsyncExc(),这只能通过ctypes模块获得。

这只在Python 2.7.3上进行了测试,但它可能适用于最近的其他2. x版本。PyThreadState_SetAsyncExc()仍然存在于Python 3中以实现向后兼容性(但我没有测试它)。

import ctypes
def terminate_thread(thread):"""Terminates a python thread from another thread.
:param thread: a threading.Thread instance"""if not thread.isAlive():return
exc = ctypes.py_object(SystemExit)res = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(thread.ident), exc)if res == 0:raise ValueError("nonexistent thread id")elif res > 1:# """if it returns a number greater than one, you're in trouble,# and you should call it again with exc=NULL to revert the effect"""ctypes.pythonapi.PyThreadState_SetAsyncExc(thread.ident, None)raise SystemError("PyThreadState_SetAsyncExc failed")
from ctypes import *pthread = cdll.LoadLibrary("libpthread-2.15.so")pthread.pthread_cancel(c_ulong(t.ident))

t是你的Thread对象。

阅读python源代码(Modules/threadmodule.cPython/thread_pthread.h),您可以看到Thread.identpthread_t类型,因此您可以在python uselibpthread中执行pthread可以执行的任何操作。

我想补充的一件事是,如果你在线程库Python中阅读官方留档,建议避免使用“恶魔”线程,当你不希望线程突然结束时,标志着Paolo Rovelli提到

官方留档:

守护进程线程会在关闭时突然停止。它们的资源(例如打开的文件、数据库事务等)可能无法正常释放。如果您希望您的线程优雅地停止,请将它们设置为非守护进程并使用合适的信令机制,例如事件。

我认为创建守护线程取决于您的应用程序,但一般来说(在我看来)最好避免杀死它们或使它们成为守护线程。在多重处理中,您可以使用is_alive()来检查进程状态并“终止”以完成它们(也可以避免GIL问题)。但是有时,当您在Windows中执行代码时,您会发现更多问题。

并且永远记住,如果你有“活线程”,Python解释器将运行等待它们。(因为这个守护进程可以帮助你如果不重要突然结束)。

如果你真的需要杀死一个子任务的能力,请使用替代实现。multiprocessinggevent都支持不分青红皂白地杀死一个“线程”。

Python的线程不支持取消。甚至不要尝试。您的代码很可能死锁、损坏或泄漏内存,或者有其他意想不到的“有趣”的难以调试的效果,这些效果很少发生且不确定。

实现Thread.stop方法绝对是可能的,如以下示例代码所示:

import sysimport threadingimport time

class StopThread(StopIteration):pass
threading.SystemExit = SystemExit, StopThread

class Thread2(threading.Thread):
def stop(self):self.__stop = True
def _bootstrap(self):if threading._trace_hook is not None:raise ValueError('Cannot run thread with tracing!')self.__stop = Falsesys.settrace(self.__trace)super()._bootstrap()
def __trace(self, frame, event, arg):if self.__stop:raise StopThread()return self.__trace

class Thread3(threading.Thread):
def _bootstrap(self, stop_thread=False):def stop():nonlocal stop_threadstop_thread = Trueself.stop = stop
def tracer(*_):if stop_thread:raise StopThread()return tracersys.settrace(tracer)super()._bootstrap()
###############################################################################

def main():test1 = Thread2(target=printer)test1.start()time.sleep(1)test1.stop()test1.join()test2 = Thread2(target=speed_test)test2.start()time.sleep(1)test2.stop()test2.join()test3 = Thread3(target=speed_test)test3.start()time.sleep(1)test3.stop()test3.join()

def printer():while True:print(time.time() % 1)time.sleep(0.1)

def speed_test(count=0):try:while True:count += 1except StopThread:print('Count =', count)
if __name__ == '__main__':main()

Thread3类似乎比Thread2类运行代码快约33%。

正如其他人提到的,规范是设置一个停止标志。对于轻量级的东西(没有Thread的子类化,没有全局变量),lambda回调是一种选择。(注意if stop()中的括号。)

import threadingimport time
def do_work(id, stop):print("I am thread", id)while True:print("I am thread {} doing something".format(id))if stop():print("  Exiting loop.")breakprint("Thread {}, signing off".format(id))

def main():stop_threads = Falseworkers = []for id in range(0,3):tmp = threading.Thread(target=do_work, args=(id, lambda: stop_threads))workers.append(tmp)tmp.start()time.sleep(3)print('main: done sleeping; time to stop the threads.')stop_threads = Truefor worker in workers:worker.join()print('Finis.')
if __name__ == '__main__':main()

用总是刷新(sys.stdout.flush())的pr()函数替换print()可以提高shell输出的精度。

(仅在Windows/Eclipse/Python3.3上测试)

这似乎适用于Windows 7上的pywin32

my_thread = threading.Thread()my_thread.start()my_thread._Thread__stop()

您可以在进程中执行命令,然后使用进程ID杀死它。我需要在两个线程之间同步,其中一个不会自行返回。

processIds = []
def executeRecord(command):print(command)
process = subprocess.Popen(command, stdout=subprocess.PIPE)processIds.append(process.pid)print(processIds[0])
#Command that doesn't return by itselfprocess.stdout.read().decode("utf-8")return;

def recordThread(command, timeOut):
thread = Thread(target=executeRecord, args=(command,))thread.start()thread.join(timeOut)
os.kill(processIds.pop(), signal.SIGINT)
return;

使用setDaemon(True)启动子线程。

def bootstrap(_filename):mb = ModelBootstrap(filename=_filename) # Has many Daemon threads. All get stopped automatically when main thread is stopped.
t = threading.Thread(target=bootstrap,args=('models.conf',))t.setDaemon(False)
while True:t.start()time.sleep(10) # I am just allowing the sub-thread to run for 10 sec. You can listen on an event to stop execution.print('Thread stopped')break

虽然它相当古老,但这个对于某些人来说可能是一个方便的解决方案:

一个扩展线程模块功能的小模块——允许一个线程在另一个线程的上下文中引发异常线程。通过提高SystemExit,您终于可以杀死python线程了。

import threadingimport ctypes
def _async_raise(tid, excobj):res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(excobj))if res == 0:raise ValueError("nonexistent thread id")elif res > 1:# """if it returns a number greater than one, you're in trouble,# and you should call it again with exc=NULL to revert the effect"""ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, 0)raise SystemError("PyThreadState_SetAsyncExc failed")
class Thread(threading.Thread):def raise_exc(self, excobj):assert self.isAlive(), "thread must be started"for tid, tobj in threading._active.items():if tobj is self:_async_raise(tid, excobj)return
# the thread was alive when we entered the loop, but was not found# in the dict, hence it must have been already terminated. should we raise# an exception here? silently ignore?
def terminate(self):# must raise the SystemExit type, instead of a SystemExit() instance# due to a bug in PyThreadState_SetAsyncExcself.raise_exc(SystemExit)

因此,它允许“线程在另一个线程的上下文中引发异常”,通过这种方式,终止的线程可以处理终止,而无需定期检查中止标志。

但是,根据其原始来源,此代码存在一些问题。

  • 只有在执行python字节码时才会引发异常。如果您的线程调用本机/内置阻塞函数,则只有当执行返回到python时才会引发异常代码。
    • 如果内置函数在内部调用PyErr_Clear(),也会出现问题,这会有效地取消挂起的异常。您可以尝试再次提高它。
  • 只有异常类型可以安全引发。异常实例可能会导致意外的行为,因此受到限制。
  • 我要求在内置线程模块中公开此函数,但由于ctype已成为标准库(从2.5开始),因此
    功能不太可能与实现无关,它可能会被保留
    未曝光。

ØMQ-项目的创始人之一Pieter Hintjens说,使用ØMQ并避免同步原语(如锁、互斥锁、事件等)是编写多线程程序的最理智和最安全的方法:

http://zguide.zeromq.org/py: all#Multithreading-with-ZeroMQ

这包括告诉子线程它应该取消它的工作。这可以通过为线程配备ØMQ套接字并在该套接字上轮询一条消息来完成。

该链接还提供了一个使用ØMQ的多线程python代码示例。

以下解决方法可用于杀死线程:

kill_threads = False
def doSomething():global kill_threadswhile True:if kill_threads:thread.exit()............
thread.start_new_thread(doSomething, ())

这甚至可以用于从主线程终止线程,其代码编写在另一个模块中。我们可以在该模块中声明一个全局变量,并使用它来终止该模块中生成的线程。

我通常使用它来终止程序出口的所有线程。这可能不是终止线程的完美方法,但可能会有所帮助。

如果您显式调用time.sleep()作为线程的一部分(例如轮询一些外部服务),对Phillipe方法的改进是在sleep()的任何地方使用eventwait()方法中的超时

例如:

import threading
class KillableThread(threading.Thread):def __init__(self, sleep_interval=1):super().__init__()self._kill = threading.Event()self._interval = sleep_interval
def run(self):while True:print("Do Something")
# If no kill signal is set, sleep for the interval,# If kill signal comes in while sleeping, immediately#  wake up and handleis_killed = self._kill.wait(self._interval)if is_killed:break
print("Killing Thread")
def kill(self):self._kill.set()

然后运行它

t = KillableThread(sleep_interval=5)t.start()# Every 5 seconds it prints:#: Do Somethingt.kill()#: Killing Thread

使用wait()而不是sleep()ing并定期检查事件的优点是,您可以在更长的睡眠间隔内编程,线程几乎立即停止(否则您将是sleep()ing),并且在我看来,处理退出的代码要简单得多。

我对这个游戏已经很晚了,但是我一直在与一个类似的问题搏斗,下面的内容似乎为我完美地解决了这个问题,并让我在守护进程退出时进行一些基本的线程状态检查和清理:

import threadingimport timeimport atexit
def do_work():
i = 0@atexit.registerdef goodbye():print ("'CLEANLY' kill sub-thread with value: %s [THREAD: %s]" %(i, threading.currentThread().ident))
while True:print ii += 1time.sleep(1)
t = threading.Thread(target=do_work)t.daemon = Truet.start()
def after_timeout():print "KILL MAIN THREAD: %s" % threading.currentThread().identraise SystemExit
threading.Timer(2, after_timeout).start()

产量:

01KILL MAIN THREAD: 140013208254208'CLEANLY' kill sub-thread with value: 2 [THREAD: 140013674317568]

有一个为此目的构建的库,停止。尽管这里列出的一些相同的警告仍然适用,但至少这个库提供了一种常规的、可重复的技术来实现所述目标。

假设,您希望拥有相同函数的多个线程,这是IMHO通过id停止一个的最简单实现:

import timefrom threading import Thread
def doit(id=0):doit.stop=0print("start id:%d"%id)while 1:time.sleep(1)print(".")if doit.stop==id:doit.stop=0breakprint("end thread %d"%id)
t5=Thread(target=doit, args=(5,))t6=Thread(target=doit, args=(6,))
t5.start() ; t6.start()time.sleep(2)doit.stop =5  #kill t5time.sleep(2)doit.stop =6  #kill t6

好东西在这里,你可以有多个相同和不同的函数,并通过functionname.stop停止它们

如果你只想有一个线程,那么你不需要记住id。如果doit.stop>0,就停止。

只是为了建立@SCB的想法(这正是我所需要的),创建一个带有自定义函数的KillableThread子类:

from threading import Thread, Event
class KillableThread(Thread):def __init__(self, sleep_interval=1, target=None, name=None, args=(), kwargs={}):super().__init__(None, target, name, args, kwargs)self._kill = Event()self._interval = sleep_intervalprint(self._target)
def run(self):while True:# Call custom function with argumentsself._target(*self._args)
# If no kill signal is set, sleep for the interval,# If kill signal comes in while sleeping, immediately#  wake up and handleis_killed = self._kill.wait(self._interval)if is_killed:break
print("Killing Thread")
def kill(self):self._kill.set()
if __name__ == '__main__':
def print_msg(msg):print(msg)
t = KillableThread(10, print_msg, args=("hello world"))t.start()time.sleep(6)print("About to kill thread")t.kill()

自然地,就像@SBC一样,线程不会等待运行一个新的循环来停止。在这个例子中,你会看到“关于杀死线程”之后打印“杀死线程”消息,而不是再等待4秒钟让线程完成(因为我们已经睡了6秒钟)。

KillableThread构造函数中的第二个参数是您的自定义函数(print_msg此处)。Args参数是在此处调用函数(("hello world"))时将使用的参数。

正如@Kozyarchuk的回答中提到的,安装跟踪可以正常工作。由于此答案不包含代码,因此这是一个可用的即用型示例:

import sys, threading, time
class TraceThread(threading.Thread):def __init__(self, *args, **keywords):threading.Thread.__init__(self, *args, **keywords)self.killed = Falsedef start(self):self._run = self.runself.run = self.settrace_and_runthreading.Thread.start(self)def settrace_and_run(self):sys.settrace(self.globaltrace)self._run()def globaltrace(self, frame, event, arg):return self.localtrace if event == 'call' else Nonedef localtrace(self, frame, event, arg):if self.killed and event == 'line':raise SystemExit()return self.localtrace
def f():while True:print('1')time.sleep(2)print('2')time.sleep(2)print('3')time.sleep(2)
t = TraceThread(target=f)t.start()time.sleep(2.5)t.killed = True

打印完12后停止。3没有打印。

这是另一种方法,但使用非常干净和简单的代码,可以在2021年的Python 3.7中运行:

import ctypes
def kill_thread(thread):"""thread: a threading.Thread object"""thread_id = thread.identres = ctypes.pythonapi.PyThreadState_SetAsyncExc(thread_id, ctypes.py_object(SystemExit))if res > 1:ctypes.pythonapi.PyThreadState_SetAsyncExc(thread_id, 0)print('Exception raise failure')

从这里改编:https://www.geeksforgeeks.org/python-different-ways-to-kill-a-thread/

python版本:3.8

使用守护线程执行我们想要的,如果我们想终止守护线程,我们只需要让父线程退出,然后系统将终止父线程创建的守护线程。

还支持协程和协程功能。

def main():start_time = time.perf_counter()t1 = ExitThread(time.sleep, (10,), debug=False)t1.start()time.sleep(0.5)t1.exit()try:print(t1.result_future.result())except concurrent.futures.CancelledError:passend_time = time.perf_counter()print(f"time cost {end_time - start_time:0.2f}")

下面是ExitThread源代码

import concurrent.futuresimport threadingimport typingimport asyncio

class _WorkItem(object):""" concurrent\futures\thread.py
"""
def __init__(self, future, fn, args, kwargs, *, debug=None):self._debug = debugself.future = futureself.fn = fnself.args = argsself.kwargs = kwargs
def run(self):if self._debug:print("ExitThread._WorkItem run")if not self.future.set_running_or_notify_cancel():return
try:coroutine = Noneif asyncio.iscoroutinefunction(self.fn):coroutine = self.fn(*self.args, **self.kwargs)elif asyncio.iscoroutine(self.fn):coroutine = self.fnif coroutine is None:result = self.fn(*self.args, **self.kwargs)else:result = asyncio.run(coroutine)if self._debug:print("_WorkItem done")except BaseException as exc:self.future.set_exception(exc)# Break a reference cycle with the exception 'exc'self = Noneelse:self.future.set_result(result)

class ExitThread:""" Like a stoppable thread
Using coroutine for target then exit before running may cause RuntimeWarning.
"""
def __init__(self, target: typing.Union[typing.Coroutine, typing.Callable] = None, args=(), kwargs={}, *, daemon=None, debug=None):#self._debug = debugself._parent_thread = threading.Thread(target=self._parent_thread_run, name="ExitThread_parent_thread", daemon=daemon)self._child_daemon_thread = Noneself.result_future = concurrent.futures.Future()self._workItem = _WorkItem(self.result_future, target, args, kwargs, debug=debug)self._parent_thread_exit_lock = threading.Lock()self._parent_thread_exit_lock.acquire()self._parent_thread_exit_lock_released = False  # When done it will be Trueself._started = Falseself._exited = Falseself.result_future.add_done_callback(self._release_parent_thread_exit_lock)
def _parent_thread_run(self):self._child_daemon_thread = threading.Thread(target=self._child_daemon_thread_run, name="ExitThread_child_daemon_thread", daemon=True)self._child_daemon_thread.start()# Block manager threadself._parent_thread_exit_lock.acquire()self._parent_thread_exit_lock.release()if self._debug:print("ExitThread._parent_thread_run exit")
def _release_parent_thread_exit_lock(self, _future):if self._debug:print(f"ExitThread._release_parent_thread_exit_lock {self._parent_thread_exit_lock_released} {_future}")if not self._parent_thread_exit_lock_released:self._parent_thread_exit_lock_released = Trueself._parent_thread_exit_lock.release()
def _child_daemon_thread_run(self):self._workItem.run()
def start(self):if self._debug:print(f"ExitThread.start {self._started}")if not self._started:self._started = Trueself._parent_thread.start()
def exit(self):if self._debug:print(f"ExitThread.exit exited: {self._exited} lock_released: {self._parent_thread_exit_lock_released}")if self._parent_thread_exit_lock_released:returnif not self._exited:self._exited = Trueif not self.result_future.cancel():if self.result_future.running():self.result_future.set_exception(concurrent.futures.CancelledError())

最简单的方法是这样的:

from threading import Threadfrom time import sleep
def do_something():global thread_workwhile thread_work:print('doing something')sleep(5)print('Thread stopped')
thread_work = TrueThread(target=do_something).start()sleep(5)thread_work = False

另一种方法是使用signal.pthread_kill发送停止信号。

from signal import pthread_kill, SIGTSTPfrom threading import Threadfrom itertools import countfrom time import sleep
def target():for num in count():print(num)sleep(1)
thread = Thread(target=target)thread.start()sleep(5)pthread_kill(thread.ident, SIGTSTP)

结果

01234
[14]+  Stopped

只需使用

if thread.is_alive():print("thread alive")else:print("thread is killed")

它会自动杀死线程