Python中的异步方法调用?

我想知道Python中是否有任何用于异步方法调用的库。如果你能做点什么就太好了

@async
def longComputation():
<code>




token = longComputation()
token.registerCallback(callback_function)
# alternative, polling
while not token.finished():
doSomethingElse()
if token.finished():
result = token.result()

或者异步调用非异步例程

def longComputation()
<code>


token = asynccall(longComputation())

如果在语言核心中有一个更精细的策略就太好了。考虑过这个问题吗?

297769 次浏览

它不在语言核心中,但一个非常成熟的库可以做你想要的扭曲的。它引入Deferred对象,您可以将回调或错误处理程序(“errbacks”)附加到该对象。Deferred基本上是一个“承诺”,即一个函数最终会有一个结果。

喜欢的东西:

import threading


thr = threading.Thread(target=foo, args=(), kwargs={})
thr.start() # Will run "foo"
....
thr.is_alive() # Will return whether foo is running currently
....
thr.join() # Will wait till "foo" is done

更多细节请参见https://docs.python.org/library/threading.html文档。

有什么理由不使用线程吗?你可以使用threading类。 使用isAlive(). c代替finished()函数。result()函数可以join()线程并检索结果。并且,如果可以的话,重写run()__init__函数来调用构造函数中指定的函数,并将值保存到类实例的某个地方

你可以使用Python 2.6中添加的多处理模块。您可以使用进程池,然后通过以下方式异步获取结果:

apply_async(func[, args[, kwds[, callback]]])

例如:

from multiprocessing import Pool


def f(x):
return x*x


if __name__ == '__main__':
pool = Pool(processes=1)              # Start a worker processes.
result = pool.apply_async(f, [10], callback) # Evaluate "f(10)" asynchronously calling callback when finished.

这只是一种选择。这个模块提供了很多工具来实现你想要的。此外,它将很容易从这做一个装饰。

我的解决方案是:

import threading


class TimeoutError(RuntimeError):
pass


class AsyncCall(object):
def __init__(self, fnc, callback = None):
self.Callable = fnc
self.Callback = callback


def __call__(self, *args, **kwargs):
self.Thread = threading.Thread(target = self.run, name = self.Callable.__name__, args = args, kwargs = kwargs)
self.Thread.start()
return self


def wait(self, timeout = None):
self.Thread.join(timeout)
if self.Thread.isAlive():
raise TimeoutError()
else:
return self.Result


def run(self, *args, **kwargs):
self.Result = self.Callable(*args, **kwargs)
if self.Callback:
self.Callback(self.Result)


class AsyncMethod(object):
def __init__(self, fnc, callback=None):
self.Callable = fnc
self.Callback = callback


def __call__(self, *args, **kwargs):
return AsyncCall(self.Callable, self.Callback)(*args, **kwargs)


def Async(fnc = None, callback = None):
if fnc == None:
def AddAsyncCallback(fnc):
return AsyncMethod(fnc, callback)
return AddAsyncCallback
else:
return AsyncMethod(fnc, callback)

并完全按要求工作:

@Async
def fnc():
pass

您可以实现一个装饰器来使您的函数异步,尽管这有点棘手。multiprocessing模块充满了小怪癖和看似任意的限制——尽管如此,更有理由将它封装在友好的界面后面。

from inspect import getmodule
from multiprocessing import Pool




def async(decorated):
r'''Wraps a top-level function around an asynchronous dispatcher.


when the decorated function is called, a task is submitted to a
process pool, and a future object is returned, providing access to an
eventual return value.


The future object has a blocking get() method to access the task
result: it will return immediately if the job is already done, or block
until it completes.


This decorator won't work on methods, due to limitations in Python's
pickling machinery (in principle methods could be made pickleable, but
good luck on that).
'''
# Keeps the original function visible from the module global namespace,
# under a name consistent to its __name__ attribute. This is necessary for
# the multiprocessing pickling machinery to work properly.
module = getmodule(decorated)
decorated.__name__ += '_original'
setattr(module, decorated.__name__, decorated)


def send(*args, **opts):
return async.pool.apply_async(decorated, args, opts)


return send

下面的代码说明了装饰器的用法:

@async
def printsum(uid, values):
summed = 0
for value in values:
summed += value


print("Worker %i: sum value is %i" % (uid, summed))


return (uid, summed)




if __name__ == '__main__':
from random import sample


# The process pool must be created inside __main__.
async.pool = Pool(4)


p = range(0, 1000)
results = []
for i in range(4):
result = printsum(i, sample(p, 100))
results.append(result)


for result in results:
print("Worker %i: sum value is %i" % result.get())

在实际的情况下,我将详细介绍装饰器,提供一些方法来关闭它以进行调试(同时保持未来的接口在适当的位置),或者可能是处理异常的工具;但我认为这充分说明了原理。

只是

import threading, time


def f():
print "f started"
time.sleep(3)
print "f finished"


threading.Thread(target=f).start()

你可以使用eventlet。它允许您编写看似同步的代码,但却可以在网络上异步操作。

下面是一个超级小爬虫的例子:

urls = ["http://www.google.com/intl/en_ALL/images/logo.gif",
"https://wiki.secondlife.com/w/images/secondlife.jpg",
"http://us.i1.yimg.com/us.yimg.com/i/ww/beta/y3.gif"]


import eventlet
from eventlet.green import urllib2


def fetch(url):


return urllib2.urlopen(url).read()


pool = eventlet.GreenPool()


for body in pool.imap(fetch, urls):
print "got body", len(body)

这对我来说很有用,你可以调用这个函数,它会把自己分派到一个新的线程上。

from thread import start_new_thread


def dowork(asynchronous=True):
if asynchronous:
args = (False)
start_new_thread(dowork,args) #Call itself on a new thread.
else:
while True:
#do something...
time.sleep(60) #sleep for a minute
return

从Python 3.5开始,可以对异步函数使用增强的生成器。

import asyncio
import datetime

增强的生成器语法:

@asyncio.coroutine
def display_date(loop):
end_time = loop.time() + 5.0
while True:
print(datetime.datetime.now())
if (loop.time() + 1.0) >= end_time:
break
yield from asyncio.sleep(1)




loop = asyncio.get_event_loop()
# Blocking call which returns when the display_date() coroutine is done
loop.run_until_complete(display_date(loop))
loop.close()

新的async/await语法:

async def display_date(loop):
end_time = loop.time() + 5.0
while True:
print(datetime.datetime.now())
if (loop.time() + 1.0) >= end_time:
break
await asyncio.sleep(1)




loop = asyncio.get_event_loop()
# Blocking call which returns when the display_date() coroutine is done
loop.run_until_complete(display_date(loop))
loop.close()

你可以使用concurrent.futures(在Python 3.2中添加)。

import time
from concurrent.futures import ThreadPoolExecutor




def long_computation(duration):
for x in range(0, duration):
print(x)
time.sleep(1)
return duration * 2




print('Use polling')
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(long_computation, 5)
while not future.done():
print('waiting...')
time.sleep(0.5)


print(future.result())


print('Use callback')
executor = ThreadPoolExecutor(max_workers=1)
future = executor.submit(long_computation, 5)
future.add_done_callback(lambda f: print(f.result()))


print('waiting for callback')


executor.shutdown(False)  # non-blocking


print('shutdown invoked')

你可以使用过程。如果你想永远运行它,在你的函数中使用while(比如networking):

from multiprocessing import Process
def foo():
while 1:
# Do something


p = Process(target = foo)
p.start()

如果你只想运行一次,可以这样做:

from multiprocessing import Process
def foo():
# Do something


p = Process(target = foo)
p.start()
p.join()

2021年的原生Python异步调用方式,Python 3.9也适用于Jupyter / Ipython内核

Camabeh的答案是从Python 3.3开始的。

async def display_date(loop):
end_time = loop.time() + 5.0
while True:
print(datetime.datetime.now())
if (loop.time() + 1.0) >= end_time:
break
await asyncio.sleep(1)




loop = asyncio.get_event_loop()
# Blocking call which returns when the display_date() coroutine is done
loop.run_until_complete(display_date(loop))
loop.close()

这将在Jupyter笔记本/ Jupyter实验室工作,但抛出一个错误:

RuntimeError: This event loop is already running

由于Ipython使用事件循环,我们需要称为嵌套异步循环的东西,而不是用Python实现。幸运的是,有nest_asyncio来处理这个问题。你所需要做的就是:

!pip install nest_asyncio # use ! within Jupyter Notebook, else pip install in shell
import nest_asyncio
nest_asyncio.apply()

(基于这个线程)

只有当你调用loop.close()时,它才会抛出另一个错误,因为它可能指向Ipython的主循环。

RuntimeError: Cannot close a running event loop

一旦有人回答这个github问题,我就会更新这个答案。

Python 3.7及以后版本中较新的asyncio运行方法使用asyncio.run(),而不是创建loop并调用loop.run_until_complete()并关闭它:

import asyncio
import datetime


async def display_date(delay):
loop = asyncio.get_running_loop()
end_time = loop.time() + delay
while True:
print("Blocking...", datetime.datetime.now())
await asyncio.sleep(1)
if loop.time() > end_time:
print("Done.")
break




asyncio.run(display_date(5))