如何从线程获得返回值?

下面的函数foo返回一个字符串'foo'。我怎么能得到从线程的目标返回的值'foo' ?

from threading import Thread


def foo(bar):
print('hello {}'.format(bar))
return 'foo'


thread = Thread(target=foo, args=('world!',))
thread.start()
return_value = thread.join()

上面所示的“one obvious way to do it”行不通:thread.join()返回None

606110 次浏览

join总是返回None,我认为你应该子类Thread处理返回代码等。

我见过的一种方法是将一个可变对象(如列表或字典)传递给线程的构造函数,同时传递一个索引或其他某种类型的标识符。然后线程可以将结果存储在该对象的专用槽中。例如:

def foo(bar, result, index):
print 'hello {0}'.format(bar)
result[index] = "foo"


from threading import Thread


threads = [None] * 10
results = [None] * 10


for i in range(len(threads)):
threads[i] = Thread(target=foo, args=('world!', results, i))
threads[i].start()


# do some other stuff


for i in range(len(threads)):
threads[i].join()


print " ".join(results)  # what sound does a metasyntactic locomotive make?

如果你真的想让join()返回被调用函数的返回值,你可以用Thread子类来实现,如下所示:

from threading import Thread


def foo(bar):
print 'hello {0}'.format(bar)
return "foo"


class ThreadWithReturnValue(Thread):
def __init__(self, group=None, target=None, name=None,
args=(), kwargs={}, Verbose=None):
Thread.__init__(self, group, target, name, args, kwargs, Verbose)
self._return = None
def run(self):
if self._Thread__target is not None:
self._return = self._Thread__target(*self._Thread__args,
**self._Thread__kwargs)
def join(self):
Thread.join(self)
return self._return


twrv = ThreadWithReturnValue(target=foo, args=('world!',))


twrv.start()
print twrv.join()   # prints foo

这有点麻烦,因为一些名称混乱,它访问“private”;特定于Thread实现的数据结构…但它确实有效。

对于Python 3:

class ThreadWithReturnValue(Thread):
    

def __init__(self, group=None, target=None, name=None,
args=(), kwargs={}, Verbose=None):
Thread.__init__(self, group, target, name, args, kwargs)
self._return = None


def run(self):
if self._target is not None:
self._return = self._target(*self._args,
**self._kwargs)
def join(self, *args):
Thread.join(self, *args)
return self._return

FWIW, multiprocessing模块使用Pool类提供了一个很好的接口。如果您希望坚持使用线程而不是进程,可以直接使用multiprocessing.pool.ThreadPool类作为替代。

def foo(bar, baz):
print 'hello {0}'.format(bar)
return 'foo' + baz


from multiprocessing.pool import ThreadPool
pool = ThreadPool(processes=1)


async_result = pool.apply_async(foo, ('world', 'foo')) # tuple of args for foo


# do some other stuff in the main process


return_val = async_result.get()  # get the return value from your function.

Jake的回答很好,但如果您不想使用线程池(您不知道需要多少线程,但可以根据需要创建它们),那么在线程之间传输信息的好方法是内置的队列中。队列类,因为它提供了线程安全性。

我创建了以下装饰器,使其以类似于线程池的方式工作:

def threaded(f, daemon=False):
import Queue


def wrapped_f(q, *args, **kwargs):
'''this function calls the decorated function and puts the
result in a queue'''
ret = f(*args, **kwargs)
q.put(ret)


def wrap(*args, **kwargs):
'''this is the function returned from the decorator. It fires off
wrapped_f in a new thread and returns the thread object with
the result queue attached'''


q = Queue.Queue()


t = threading.Thread(target=wrapped_f, args=(q,)+args, kwargs=kwargs)
t.daemon = daemon
t.start()
t.result_queue = q
return t


return wrap

然后你就把它用作:

@threaded
def long_task(x):
import time
x = x + 5
time.sleep(5)
return x


# does not block, returns Thread object
y = long_task(10)
print y


# this blocks, waiting for the result
result = y.result_queue.get()
print result

装饰函数每次被调用时都会创建一个新线程,并返回一个thread对象,其中包含将接收结果的队列。

更新

自从我发布这个答案已经有一段时间了,但它仍然得到了观看,所以我想我应该更新它,以反映我在新版本的Python中这样做的方式:

Python 3.2添加到concurrent.futures模块中,为并行任务提供了高级接口。它提供了ThreadPoolExecutorProcessPoolExecutor,因此您可以使用具有相同api的线程或进程池。

该api的一个好处是,向Executor提交任务将返回Future对象,该对象将以您提交的可调用对象的返回值结束。

这使得没有必要附加queue对象,这大大简化了装饰器:

_DEFAULT_POOL = ThreadPoolExecutor()


def threadpool(f, executor=None):
@wraps(f)
def wrap(*args, **kwargs):
return (executor or _DEFAULT_POOL).submit(f, *args, **kwargs)


return wrap

如果没有传入,将使用默认的模块线程池执行器。

用法和前面的非常相似:

@threadpool
def long_task(x):
import time
x = x + 5
time.sleep(5)
return x


# does not block, returns Future object
y = long_task(10)
print y


# this blocks, waiting for the result
result = y.result()
print result

如果您使用的是Python 3.4+,使用此方法(以及一般的Future对象)的一个非常好的特性是,返回的Future可以被包装,并使用asyncio.wrap_future将其转换为asyncio.Future。这使得它很容易与协程一起工作:

result = await asyncio.wrap_future(long_task(10))

如果你不需要访问底层的concurrent.Future对象,你可以在装饰器中包含换行:

_DEFAULT_POOL = ThreadPoolExecutor()


def threadpool(f, executor=None):
@wraps(f)
def wrap(*args, **kwargs):
return asyncio.wrap_future((executor or _DEFAULT_POOL).submit(f, *args, **kwargs))


return wrap

然后,当你需要将cpu密集型代码或阻塞代码从事件循环线程中推出时,你可以将它放在装饰函数中:

@threadpool
def some_long_calculation():
...


# this will suspend while the function is executed on a threadpool
result = await some_long_calculation()

我对这个问题的解决方案是将函数和线程包装在一个类中。不需要使用池、队列或c类型变量传递。它也是非阻塞的。而是检查状态。参见代码末尾如何使用它的示例。

import threading


class ThreadWorker():
'''
The basic idea is given a function create an object.
The object can then run the function in a thread.
It provides a wrapper to start it,check its status,and get data out the function.
'''
def __init__(self,func):
self.thread = None
self.data = None
self.func = self.save_data(func)


def save_data(self,func):
'''modify function to save its returned data'''
def new_func(*args, **kwargs):
self.data=func(*args, **kwargs)


return new_func


def start(self,params):
self.data = None
if self.thread is not None:
if self.thread.isAlive():
return 'running' #could raise exception here


#unless thread exists and is alive start or restart it
self.thread = threading.Thread(target=self.func,args=params)
self.thread.start()
return 'started'


def status(self):
if self.thread is None:
return 'not_started'
else:
if self.thread.isAlive():
return 'running'
else:
return 'finished'


def get_results(self):
if self.thread is None:
return 'not_started' #could return exception
else:
if self.thread.isAlive():
return 'running'
else:
return self.data


def add(x,y):
return x +y


add_worker = ThreadWorker(add)
print add_worker.start((1,2,))
print add_worker.status()
print add_worker.get_results()

我偷了kindall的答案,稍微整理了一下。

关键部分是为join()添加*args和**kwargs,以便处理超时

class threadWithReturn(Thread):
def __init__(self, *args, **kwargs):
super(threadWithReturn, self).__init__(*args, **kwargs)
        

self._return = None
    

def run(self):
if self._Thread__target is not None:
self._return = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
    

def join(self, *args, **kwargs):
super(threadWithReturn, self).join(*args, **kwargs)
        

return self._return

更新答案如下

这是我得到最多好评的答案,所以我决定更新可以在py2和py3上运行的代码。

此外,我看到许多对这个问题的回答都显示出对Thread.join()缺乏理解。有些完全不能处理timeout参数。但是,当你有(1)一个目标函数可以返回None并且(2)你也将timeout参数传递给join()时,还有一个极端情况,你应该意识到。请参见“TEST 4”;为了理解这种极端情况。

ThreadWithReturn类,用于py2和py3:

import sys
from threading import Thread
from builtins import super    # https://stackoverflow.com/a/30159479


_thread_target_key, _thread_args_key, _thread_kwargs_key = (
('_target', '_args', '_kwargs')
if sys.version_info >= (3, 0) else
('_Thread__target', '_Thread__args', '_Thread__kwargs')
)


class ThreadWithReturn(Thread):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._return = None
    

def run(self):
target = getattr(self, _thread_target_key)
if target is not None:
self._return = target(
*getattr(self, _thread_args_key),
**getattr(self, _thread_kwargs_key)
)
    

def join(self, *args, **kwargs):
super().join(*args, **kwargs)
return self._return

一些示例测试如下所示:

import time, random


# TEST TARGET FUNCTION
def giveMe(arg, seconds=None):
if not seconds is None:
time.sleep(seconds)
return arg


# TEST 1
my_thread = ThreadWithReturn(target=giveMe, args=('stringy',))
my_thread.start()
returned = my_thread.join()
# (returned == 'stringy')


# TEST 2
my_thread = ThreadWithReturn(target=giveMe, args=(None,))
my_thread.start()
returned = my_thread.join()
# (returned is None)


# TEST 3
my_thread = ThreadWithReturn(target=giveMe, args=('stringy',), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=2)
# (returned is None) # because join() timed out before giveMe() finished


# TEST 4
my_thread = ThreadWithReturn(target=giveMe, args=(None,), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=random.randint(1, 10))

你能确定我们在测试4中可能遇到的极端情况吗?

问题是我们期望giveMe()返回None(参见TEST 2),但我们也期望join()在超时时返回None。

returned is None表示:

(1)这就是giveMe()返回的,或者

(2) join()超时

这个例子很简单,因为我们知道giveMe()总是返回None。但在真实的实例中(目标可能返回None或其他内容),我们希望显式地检查发生了什么。

下面是如何解决这种极端情况:

# TEST 4
my_thread = ThreadWithReturn(target=giveMe, args=(None,), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=random.randint(1, 10))


if my_thread.isAlive():
# returned is None because join() timed out
# this also means that giveMe() is still running in the background
pass
# handle this based on your app's logic
else:
# join() is finished, and so is giveMe()
# BUT we could also be in a race condition, so we need to update returned, just in case
returned = my_thread.join()
定义你的目标为
1)取参数q
2)将return foo替换为q.put(foo); return

一个函数

def func(a):
ans = a * a
return ans

将成为

def func(a, q):
ans = a * a
q.put(ans)
return

然后你就可以这样做了

from Queue import Queue
from threading import Thread


ans_q = Queue()
arg_tups = [(i, ans_q) for i in xrange(10)]


threads = [Thread(target=func, args=arg_tup) for arg_tup in arg_tups]
_ = [t.start() for t in threads]
_ = [t.join() for t in threads]
results = [q.get() for _ in xrange(len(threads))]

您可以使用函数装饰器/包装器来实现它,这样您就可以使用现有的函数作为target而无需修改它们,但请遵循以下基本方案。

您可以在线程函数的作用域之上定义一个可变变量,并将结果添加到该变量中。(我还修改了代码,使其与python3兼容)

returns = {}
def foo(bar):
print('hello {0}'.format(bar))
returns[bar] = 'foo'


from threading import Thread
t = Thread(target=foo, args=('world!',))
t.start()
t.join()
print(returns)

这将返回{'world!': 'foo'}

如果使用函数input作为结果字典的键,则保证每个惟一的输入都在结果中给出一个条目

另一个不需要更改现有代码的解决方案:

import Queue             # Python 2.x
#from queue import Queue # Python 3.x


from threading import Thread


def foo(bar):
print 'hello {0}'.format(bar)     # Python 2.x
#print('hello {0}'.format(bar))   # Python 3.x
return 'foo'


que = Queue.Queue()      # Python 2.x
#que = Queue()           # Python 3.x


t = Thread(target=lambda q, arg1: q.put(foo(arg1)), args=(que, 'world!'))
t.start()
t.join()
result = que.get()
print result             # Python 2.x
#print(result)           # Python 3.x

它也可以很容易地调整到多线程环境:

import Queue             # Python 2.x
#from queue import Queue # Python 3.x
from threading import Thread


def foo(bar):
print 'hello {0}'.format(bar)     # Python 2.x
#print('hello {0}'.format(bar))   # Python 3.x
return 'foo'


que = Queue.Queue()      # Python 2.x
#que = Queue()           # Python 3.x


threads_list = list()


t = Thread(target=lambda q, arg1: q.put(foo(arg1)), args=(que, 'world!'))
t.start()
threads_list.append(t)


# Add more threads here
...
threads_list.append(t2)
...
threads_list.append(t3)
...


# Join all the threads
for t in threads_list:
t.join()


# Check thread's return value
while not que.empty():
result = que.get()
print result         # Python 2.x
#print(result)       # Python 3.x

我正在使用这个包装器,它可以轻松地将任何函数转换为Thread中运行的函数—照顾它的返回值或异常。它不会增加Queue开销。

def threading_func(f):
"""Decorator for running a function in a thread and handling its return
value or exception"""
def start(*args, **kw):
def run():
try:
th.ret = f(*args, **kw)
except:
th.exc = sys.exc_info()
def get(timeout=None):
th.join(timeout)
if th.exc:
raise th.exc[0], th.exc[1], th.exc[2] # py2
##raise th.exc[1] #py3
return th.ret
th = threading.Thread(None, run)
th.exc = None
th.get = get
th.start()
return th
return start

用法示例

def f(x):
return 2.5 * x
th = threading_func(f)(4)
print("still running?:", th.is_alive())
print("result:", th.get(timeout=1.0))


@threading_func
def th_mul(a, b):
return a * b
th = th_mul("text", 2.5)


try:
print(th.get())
except TypeError:
print("exception thrown ok.")

关于threading模块的说明

舒适的回报值&线程函数的异常处理是一个常见的“python”需求,threading模块应该已经提供了——可能直接在标准的Thread类中。ThreadPool对于简单的任务有太多的开销——3个管理线程,很多官僚主义。不幸的是,Thread的布局最初是从Java复制的——例如,从仍然无用的构造函数参数group中复制的。

Parris / kindall的回答 join/return回答移植到Python 3:

from threading import Thread


def foo(bar):
print('hello {0}'.format(bar))
return "foo"


class ThreadWithReturnValue(Thread):
def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None):
Thread.__init__(self, group, target, name, args, kwargs, daemon=daemon)


self._return = None


def run(self):
if self._target is not None:
self._return = self._target(*self._args, **self._kwargs)


def join(self):
Thread.join(self)
return self._return




twrv = ThreadWithReturnValue(target=foo, args=('world!',))


twrv.start()
print(twrv.join())   # prints foo

注意,Thread类在Python 3中实现的方式不同。

一种常见的解决方案是用类似于foo的装饰器包装函数

result = queue.Queue()


def task_wrapper(*args):
result.put(target(*args))

那么整个代码可能是这样的

result = queue.Queue()


def task_wrapper(*args):
result.put(target(*args))


threads = [threading.Thread(target=task_wrapper, args=args) for args in args_list]


for t in threads:
t.start()
while(True):
if(len(threading.enumerate()) < max_num):
break
for t in threads:
t.join()
return result

请注意

一个重要的问题是返回值可能是unorderred。 (事实上,return value并不一定保存到queue,因为您可以选择任意的线程安全的数据结构)

考虑到< >强@iman < / >强< >强@JakeBiesinger < / >强回答的评论,我重新组合了它,使其具有不同数量的线程:

from multiprocessing.pool import ThreadPool


def foo(bar, baz):
print 'hello {0}'.format(bar)
return 'foo' + baz


numOfThreads = 3
results = []


pool = ThreadPool(numOfThreads)


for i in range(0, numOfThreads):
results.append(pool.apply_async(foo, ('world', 'foo'))) # tuple of args for foo)


# do some other stuff in the main process
# ...
# ...


results = [r.get() for r in results]
print results


pool.close()
pool.join()

使用队列:

import threading, queue


def calc_square(num, out_queue1):
l = []
for x in num:
l.append(x*x)
out_queue1.put(l)




arr = [1,2,3,4,5,6,7,8,9,10]
out_queue1=queue.Queue()
t1=threading.Thread(target=calc_square, args=(arr,out_queue1))
t1.start()
t1.join()
print (out_queue1.get())

如上所述,多处理池比基本线程要慢得多。使用一些回答中提出的队列是一种非常有效的替代方法。我已经将它与字典一起使用,以便能够运行许多小线程,并通过将它们与字典结合来恢复多个答案:

#!/usr/bin/env python3


import threading
# use Queue for python2
import queue
import random


LETTERS = 'abcdefghijklmnopqrstuvwxyz'
LETTERS = [ x for x in LETTERS ]


NUMBERS = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]


def randoms(k, q):
result = dict()
result['letter'] = random.choice(LETTERS)
result['number'] = random.choice(NUMBERS)
q.put({k: result})


threads = list()
q = queue.Queue()
results = dict()


for name in ('alpha', 'oscar', 'yankee',):
threads.append( threading.Thread(target=randoms, args=(name, q)) )
threads[-1].start()
_ = [ t.join() for t in threads ]
while not q.empty():
results.update(q.get())


print(results)

GuySoft的想法很棒,但我认为对象不一定要从Thread继承,start()可以从接口中删除:

from threading import Thread
import queue
class ThreadWithReturnValue(object):
def __init__(self, target=None, args=(), **kwargs):
self._que = queue.Queue()
self._t = Thread(target=lambda q,arg1,kwargs1: q.put(target(*arg1, **kwargs1)) ,
args=(self._que, args, kwargs), )
self._t.start()


def join(self):
self._t.join()
return self._que.get()




def foo(bar):
print('hello {0}'.format(bar))
return "foo"


twrv = ThreadWithReturnValue(target=foo, args=('world!',))


print(twrv.join())   # prints foo

Python3中的Kindall的回答

class ThreadWithReturnValue(Thread):
def __init__(self, group=None, target=None, name=None,
args=(), kwargs={}, *, daemon=None):
Thread.__init__(self, group, target, name, args, kwargs, daemon)
self._return = None


def run(self):
try:
if self._target:
self._return = self._target(*self._args, **self._kwargs)
finally:
del self._target, self._args, self._kwargs


def join(self,timeout=None):
Thread.join(self,timeout)
return self._return

在Python 3.2+中,stdlib concurrent.futures模块为threading提供了更高级别的API,包括将返回值或异常从工作线程传递回主线程:

import concurrent.futures


def foo(bar):
print('hello {}'.format(bar))
return 'foo'


with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(foo, 'world!')
return_value = future.result()
print(return_value)

根据上面提到的,下面是适用于Python3的更通用的解决方案。

import threading


class ThreadWithReturnValue(threading.Thread):
def __init__(self, *init_args, **init_kwargs):
threading.Thread.__init__(self, *init_args, **init_kwargs)
self._return = None
def run(self):
self._return = self._target(*self._args, **self._kwargs)
def join(self):
threading.Thread.join(self)
return self._return

使用

        th = ThreadWithReturnValue(target=requests.get, args=('http://www.google.com',))
th.start()
response = th.join()
response.status_code  # => 200

这是我创建的@Kindall的回答的版本。

这个版本使得您所要做的就是输入带有参数的命令来创建新线程。

这是用Python 3.8做的:

from threading import Thread
from typing import Any


def test(plug, plug2, plug3):
print(f"hello {plug}")
print(f'I am the second plug : {plug2}')
print(plug3)
return 'I am the return Value!'


def test2(msg):
return f'I am from the second test: {msg}'


def test3():
print('hello world')


def NewThread(com, Returning: bool, *arguments) -> Any:
"""
Will create a new thread for a function/command.


:param com: Command to be Executed
:param arguments: Arguments to be sent to Command
:param Returning: True/False Will this command need to return anything
"""
class NewThreadWorker(Thread):
def __init__(self, group = None, target = None, name = None, args = (), kwargs = None, *,
daemon = None):
Thread.__init__(self, group, target, name, args, kwargs, daemon = daemon)
            

self._return = None
        

def run(self):
if self._target is not None:
self._return = self._target(*self._args, **self._kwargs)
        

def join(self):
Thread.join(self)
return self._return
    

ntw = NewThreadWorker(target = com, args = (*arguments,))
ntw.start()
if Returning:
return ntw.join()


if __name__ == "__main__":
print(NewThread(test, True, 'hi', 'test', test2('hi')))
NewThread(test3, True)

我知道这个线程是旧的....但我也遇到了同样的问题…如果您愿意使用thread.join()

import threading


class test:


def __init__(self):
self.msg=""


def hello(self,bar):
print('hello {}'.format(bar))
self.msg="foo"




def main(self):
thread = threading.Thread(target=self.hello, args=('world!',))
thread.start()
thread.join()
print(self.msg)


g=test()
g.main()

我找到的大多数答案都很长,需要熟悉其他模块或高级python特性,除非他们已经熟悉答案所谈论的一切,否则会让人感到困惑。

简化方法的工作代码:

import threading


class ThreadWithResult(threading.Thread):
def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None):
def function():
self.result = target(*args, **kwargs)
super().__init__(group=group, target=function, name=name, daemon=daemon)


示例代码:

import time, random




def function_to_thread(n):
count = 0
while count < 3:
print(f'still running thread {n}')
count +=1
time.sleep(3)
result = random.random()
print(f'Return value of thread {n} should be: {result}')
return result




def main():
thread1 = ThreadWithResult(target=function_to_thread, args=(1,))
thread2 = ThreadWithResult(target=function_to_thread, args=(2,))
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print(thread1.result)
print(thread2.result)


main()

< >强劲的解释: 我想大大简化事情,所以我创建了一个ThreadWithResult类,并让它从threading.Thread继承。__init__中的嵌套函数function调用了我们想要保存值的线程函数,并在线程执行完成后将该嵌套函数的结果保存为实例属性self.result

创建该对象的实例与创建threading.Thread实例相同。将想要在新线程上运行的函数传递给target参数,将函数可能需要的任何参数传递给args参数,将任何关键字参数传递给kwargs参数。

如。

my_thread = ThreadWithResult(target=my_function, args=(arg1, arg2, arg3))

我认为这比绝大多数答案更容易理解,而且这种方法不需要额外的导入!我包含了timerandom模块来模拟线程的行为,但它们并不是实现最初的问题中要求的功能所必需的。

我知道我是在这个问题被问到很久之后才回答的,但我希望这能在未来帮助更多的人!


编辑:我创建了save-thread-result PyPI包来允许你访问上面相同的代码,并在项目中重用它(GitHub代码在这里)。PyPI包完全扩展了threading.Thread类,所以您也可以在ThreadWithResult类上设置在threading.thread上设置的任何属性!

上面的原始答案介绍了这个子类背后的主要思想,但要了解更多信息,请参阅更详细的解释(来自模块docstring)在这里

快速使用示例:

pip3 install -U save-thread-result     # MacOS/Linux
pip  install -U save-thread-result     # Windows


python3     # MacOS/Linux
python      # Windows
from save_thread_result import ThreadWithResult


# As of Release 0.0.3, you can also specify values for
#`group`, `name`, and `daemon` if you want to set those
# values manually.
thread = ThreadWithResult(
target = my_function,
args   = (my_function_arg1, my_function_arg2, ...)
kwargs = {my_function_kwarg1: kwarg1_value, my_function_kwarg2: kwarg2_value, ...}
)


thread.start()
thread.join()
if getattr(thread, 'result', None):
print(thread.result)
else:
# thread.result attribute not set - something caused
# the thread to terminate BEFORE the thread finished
# executing the function passed in through the
# `target` argument
print('ERROR! Something went wrong while executing this thread, and the function you passed in did NOT complete!!')


# seeing help about the class and information about the threading.Thread super class methods and attributes available:
help(ThreadWithResult)

这是一个很老的问题,但我想分享一个简单的解决方案,它对我的开发过程有帮助。

这个答案背后的方法论是这样一个事实:“新”;目标函数,inner通过所谓的闭包将原始函数的结果(通过__init__函数传递)分配给包装器的result实例属性。

这允许包装器类保留返回值以供调用者随时访问。

注意:这个方法不需要使用任何manged方法或threading.Thread类的私有方法,尽管没有考虑yield函数(OP没有提到yield函数)。

享受吧!

from threading import Thread as _Thread




class ThreadWrapper:
def __init__(self, target, *args, **kwargs):
self.result = None
self._target = self._build_threaded_fn(target)
self.thread = _Thread(
target=self._target,
*args,
**kwargs
)


def _build_threaded_fn(self, func):
def inner(*args, **kwargs):
self.result = func(*args, **kwargs)
return inner


此外,你可以用下面的代码运行pytest(假设你已经安装了它)来演示结果:

import time
from commons import ThreadWrapper




def test():


def target():
time.sleep(1)
return 'Hello'


wrapper = ThreadWrapper(target=target)
wrapper.thread.start()


r = wrapper.result
assert r is None


time.sleep(2)


r = wrapper.result
assert r == 'Hello'

最好的方法…定义一个全局变量,然后在线程函数中更改该变量。没有可以传递或取回的东西

from threading import Thread


# global var
radom_global_var = 5


def function():
global random_global_var
random_global_var += 1


domath = Thread(target=function)
domath.start()
domath.join()
print(random_global_var)


# result: 6

你可以使用ThreadPool()中的pool.apply_async()test()返回的值,如下所示:

from multiprocessing.pool import ThreadPool


def test(num1, num2):
return num1 + num2


pool = ThreadPool(processes=1) # Here
result = pool.apply_async(test, (2, 3)) # Here
print(result.get()) # 5

并且,你也可以使用concurrent.futures.ThreadPoolExecutor()中的submit()test()返回的值,如下所示:

from concurrent.futures import ThreadPoolExecutor


def test(num1, num2):
return num1 + num2


with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(test, 2, 3) # Here
print(future.result()) # 5

并且,代替return,你可以使用数组result,如下所示:

from threading import Thread


def test(num1, num2, r):
r[0] = num1 + num2 # Instead of "return"


result = [None] # Here


thread = Thread(target=test, args=(2, 3, result))
thread.start()
thread.join()
print(result[0]) # 5

而不是return,你也可以使用队列result,如下所示:

from threading import Thread
import queue


def test(num1, num2, q):
q.put(num1 + num2) # Instead of "return"


queue = queue.Queue() # Here


thread = Thread(target=test, args=(2, 3, queue))
thread.start()
thread.join()
print(queue.get()) # '5'

我发现做到这一点的最短和最简单的方法是利用Python类及其动态属性。您可以使用threading.current_thread()从派生线程的上下文中检索当前线程,并将返回值赋给一个属性。

import threading


def some_target_function():
# Your code here.
threading.current_thread().return_value = "Some return value."


your_thread = threading.Thread(target=some_target_function)
your_thread.start()
your_thread.join()


return_value = your_thread.return_value
print(return_value)