“ Fire and forget”python 异步/等待

有时候需要执行一些非关键的异步操作,但是我不想等它完成。在龙卷风的协同程序实现中,你可以通过简单地省略 yield关键字来“发射并忘记”一个异步函数。

我一直试图弄明白如何使用 Python 3.5中发布的新 async/await语法来“激活并忘记”。例如,一个简化的代码片段:

async def async_foo():
print("Do some stuff asynchronously here...")


def bar():
async_foo()  # fire and forget "async_foo()"


bar()

但是,bar()从来不执行,相反,我们会得到一个运行时警告:

RuntimeWarning: coroutine 'async_foo' was never awaited
async_foo()  # fire and forget "async_foo()"
80615 次浏览

这并不完全是异步执行,但也许 Run _ in _ execute ()适合您。

def fire_and_forget(task, *args, **kwargs):
loop = asyncio.get_event_loop()
if callable(task):
return loop.run_in_executor(None, task, *args, **kwargs)
else:
raise TypeError('Task must be a callable')


def foo():
#asynchronous stuff here




fire_and_forget(foo)

更新:

如果使用 Python > = 3.7,那么在任何地方都可以用 asyncio.create_task替换 asyncio.ensure_future。这是一种更新、更好的 产生任务方式。


异步。任务“发射和忘记”

根据对于 asyncio.Task的 python 文档,有可能开始一些对于 执行“在后台”的协程。由 asyncio.ensure_future创建的任务不会阻止执行(因此函数将立即返回!).这看起来像是你要求的“解雇然后忘记”的方法。

import asyncio




async def async_foo():
print("async_foo started")
await asyncio.sleep(1)
print("async_foo done")




async def main():
asyncio.ensure_future(async_foo())  # fire and forget async_foo()


# btw, you can also create tasks inside non-async funcs


print('Do some actions 1')
await asyncio.sleep(1)
print('Do some actions 2')
await asyncio.sleep(1)
print('Do some actions 3')




if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

产出:

Do some actions 1
async_foo started
Do some actions 2
async_foo done
Do some actions 3

如果任务在事件循环完成后执行,该怎么办?

注意,异步期望在事件循环完成时完成任务。所以如果你把 main()改成:

async def main():
asyncio.ensure_future(async_foo())  # fire and forget


print('Do some actions 1')
await asyncio.sleep(0.1)
print('Do some actions 2')

在程序结束后,您将收到以下警告:

Task was destroyed but it is pending!
task: <Task pending coro=<async_foo() running at [...]

为了防止您只能在事件循环完成后使用 等待所有未完成的任务:

async def main():
asyncio.ensure_future(async_foo())  # fire and forget


print('Do some actions 1')
await asyncio.sleep(0.1)
print('Do some actions 2')




if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
    

# Let's also finish all running tasks:
pending = asyncio.Task.all_tasks()
loop.run_until_complete(asyncio.gather(*pending))

杀死任务而不是等待它们

有时您不想等待任务完成(例如,有些任务可能被创建为永远运行)。在这种情况下,你可以只是 cancel()他们,而不是等待他们:

import asyncio
from contextlib import suppress




async def echo_forever():
while True:
print("echo")
await asyncio.sleep(1)




async def main():
asyncio.ensure_future(echo_forever())  # fire and forget


print('Do some actions 1')
await asyncio.sleep(1)
print('Do some actions 2')
await asyncio.sleep(1)
print('Do some actions 3')




if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())


# Let's also cancel all running tasks:
pending = asyncio.Task.all_tasks()
for task in pending:
task.cancel()
# Now we should await task to execute it's cancellation.
# Cancelled task raises asyncio.CancelledError that we can suppress:
with suppress(asyncio.CancelledError):
loop.run_until_complete(task)

产出:

Do some actions 1
echo
Do some actions 2
echo
Do some actions 3
echo

产出:

>>> Hello
>>> foo() started
>>> I didn't wait for foo()
>>> foo() completed

下面是一个简单的装饰函数,它将执行推到后台,控制行移动到代码的下一行。

主要的优点是,不必将函数声明为 await

import asyncio
import time


def fire_and_forget(f):
def wrapped(*args, **kwargs):
return asyncio.get_event_loop().run_in_executor(None, f, *args, *kwargs)


return wrapped


@fire_and_forget
def foo():
print("foo() started")
time.sleep(1)
print("foo() completed")


print("Hello")
foo()
print("I didn't wait for foo()")

注意: 检查我的其他 回答做同样的使用简单的 thread没有 asyncio

由于某些原因,如果您不能使用 asyncio,那么这里是使用普通线程的实现。检查我的其他答案和谢尔盖的答案。

import threading, time


def fire_and_forget(f):
def wrapped():
threading.Thread(target=f).start()


return wrapped


@fire_and_forget
def foo():
print("foo() started")
time.sleep(1)
print("foo() completed")


print("Hello")
foo()
print("I didn't wait for foo()")

生产

>>> Hello
>>> foo() started
>>> I didn't wait for foo()
>>> foo() completed
def fire_and_forget(f):
def wrapped(*args, **kwargs):
threading.Thread(target=functools.partial(f, *args, **kwargs)).start()


return wrapped

是上述方法的更好版本——不使用异步