Python 中的异步异常处理

下面的代码使用 asyncioaiohttp进行异步 HTTP 请求。

import sys
import asyncio
import aiohttp


@asyncio.coroutine
def get(url):
try:
print('GET %s' % url)
resp = yield from aiohttp.request('GET', url)
except Exception as e:
raise Exception("%s has error '%s'" % (url, e))
else:
if resp.status >= 400:
raise Exception("%s has error '%s: %s'" % (url, resp.status, resp.reason))


return (yield from resp.text())


@asyncio.coroutine
def fill_data(run):
url = 'http://www.google.com/%s' % run['name']
run['data'] = yield from get(url)


def get_runs():
runs = [ {'name': 'one'}, {'name': 'two'} ]
loop = asyncio.get_event_loop()
task = asyncio.wait([fill_data(r) for r in runs])
loop.run_until_complete(task)
return runs


try:
get_runs()
except Exception as e:
print(repr(e))
sys.exit(1)

由于某种原因,get函数内部引发的异常没有被捕获:

Future/Task exception was never retrieved
Traceback (most recent call last):
File "site-packages/asyncio/tasks.py", line 236, in _step
result = coro.send(value)
File "mwe.py", line 25, in fill_data
run['data'] = yield from get(url)
File "mwe.py", line 17, in get
raise Exception("%s has error '%s: %s'" % (url, resp.status, resp.reason))
Exception: http://www.google.com/two has error '404: Not Found'

那么,处理协程引发的异常的正确方法是什么呢?

94978 次浏览

asyncio.wait实际上并不使用传递给它的 Futures,它只是等待它们完成,然后返回 Future对象:

辅助程序 < strong > asyncio.wait(futures, *, loop=None, timeout=None, return_when=ALL_COMPLETED)

等待 Futures 和 coroutine 对象 序列未来给出的完成。协程将被包装 返回两组 Future: (完成,挂起)。

直到你实际上 yield from/await的项目中的 done清单,他们将保持不消耗。由于程序退出时没有使用期货,所以您会看到“从未检索到异常”消息。

对于您的用例来说,使用 asyncio.gather可能更有意义,它实际上将消耗每个 Future,然后返回一个单独的 Future来聚合它们的所有结果(或者在输入列表中引发未来抛出的第一个 Exception)。

def get_runs():
runs = [ {'name': 'one'}, {'name': 'two'} ]
loop = asyncio.get_event_loop()
tasks = asyncio.gather(*[fill_data(r) for r in runs])
loop.run_until_complete(tasks)
return runs

产出:

GET http://www.google.com/two
GET http://www.google.com/one
Exception("http://www.google.com/one has error '404: Not Found'",)

请注意,asyncio.gather实际上允许您自定义其在某个期货引发异常时的行为; 默认行为是引发它命中的第一个异常,但它也可以只返回输出列表中的每个异常对象:

asyncio.gather(*coros_or_futures, loop=None, return_exceptions=False)

返回给定协同程序对象的未来聚合结果 或者未来。

所有的期货必须共享相同的事件循环。如果所有的任务都完成了 成功地,返回的将来结果是结果列表(在 原始序列的顺序,不一定是原始序列的顺序 如果 return_exceptionsTrue,则 任务被视为成功的结果,并在 结果列表; 否则,将立即显示第一个引发的异常 传播到回归的未来

复试中调试或“处理”异常:

返回某些结果或引发异常的协程:

@asyncio.coroutine
def async_something_entry_point(self):
try:
return self.real_stuff_which_throw_exceptions()
except:
raise Exception(some_identifier_here + ' ' + traceback.format_exc())

还有复试:

def callback(self, future: asyncio.Future):
exc = future.exception()
if exc:
# Handle wonderful empty TimeoutError exception
if type(exc) == TimeoutError:
self.logger('<Some id here> callback exception TimeoutError')
else:
self.logger("<Some id here> callback exception " + str(exc))


# store your result where you want
self.result.append(
future.result()
)