在使用异步时,如何在关闭事件循环之前允许所有正在运行的任务完成

我有以下密码:

@asyncio.coroutine
def do_something_periodically():
while True:
asyncio.async(my_expensive_operation())
yield from asyncio.sleep(my_interval)
if shutdown_flag_is_set:
print("Shutting down")
break

我运行这个函数直到完成。当设置了关闭时,问题就出现了——函数完成,任何挂起的任务都不会运行。

这就是错误所在:

task: <Task pending coro=<report() running at script.py:33> wait_for=<Future pending cb=[Task._wakeup()]>>

如何正确安排关机时间?

为了给出一些上下文,我正在编写一个系统监视器,它每5秒钟从/proc/stat 读取一次,计算这段时间内的 CPU 使用情况,然后将结果发送到服务器。我希望继续调度这些监视作业,直到收到 sigterm,当我停止调度时,等待所有当前作业完成,然后优雅地退出。

66252 次浏览

您可以检索未完成的任务并再次运行循环,直到它们完成,然后关闭循环或退出程序。

pending = asyncio.all_tasks()
loop.run_until_complete(asyncio.gather(*pending))
  • pending是一个挂起任务的列表。
  • asyncio.gather()允许同时等待多个任务。

如果你想确保所有的任务都在协同程序中完成(也许你有一个“主”协同程序) ,你可以这样做,例如:

async def do_something_periodically():
while True:
asyncio.create_task(my_expensive_operation())
await asyncio.sleep(my_interval)
if shutdown_flag_is_set:
print("Shutting down")
break


await asyncio.gather(*asyncio.all_tasks())

此外,在这种情况下,由于所有任务都是在同一协程中创建的,因此您已经可以访问这些任务:

async def do_something_periodically():
tasks = []
while True:
tasks.append(asyncio.create_task(my_expensive_operation()))
await asyncio.sleep(my_interval)
if shutdown_flag_is_set:
print("Shutting down")
break


await asyncio.gather(*tasks)

在 Python 3.7中,上面的答案使用了多个 已废弃的 API(syncio.sync 和 Task.all _ asks,@syncio)。你应该使用这个:

import asyncio




async def my_expensive_operation(expense):
print(await asyncio.sleep(expense, result="Expensive operation finished."))




async def do_something_periodically(expense, interval):
while True:
asyncio.create_task(my_expensive_operation(expense))
await asyncio.sleep(interval)




loop = asyncio.get_event_loop()
coro = do_something_periodically(1, 1)


try:
loop.run_until_complete(coro)
except KeyboardInterrupt:
coro.close()
tasks = asyncio.all_tasks(loop)
expensive_tasks = {task for task in tasks if task._coro.__name__ != coro.__name__}
loop.run_until_complete(asyncio.gather(*expensive_tasks))

您也可以考虑使用 异步屏蔽,尽管通过这种方式您不会得到 全部运行任务完成,但只能得到 被屏蔽了。但它在某些情况下仍然有用。

除此之外,对于 Python 3.7,我们还可以在这里使用高级 API 方法 Asynio 快跑。作为 Python 的核心开发者,Yury Selivanov 建议: Https://youtu.be/rexxo_azv-w?t=636
注意: 在 Python 3.7中,已经临时将 syncio.run 函数添加到 syncio 中。

希望能帮上忙!

import asyncio




async def my_expensive_operation(expense):
print(await asyncio.sleep(expense, result="Expensive operation finished."))




async def do_something_periodically(expense, interval):
while True:
asyncio.create_task(my_expensive_operation(expense))
# using asyncio.shield
await asyncio.shield(asyncio.sleep(interval))




coro = do_something_periodically(1, 1)


if __name__ == "__main__":
try:
# using asyncio.run
asyncio.run(coro)
except KeyboardInterrupt:
print('Cancelled!')

使用一个包装协同程序,该程序等待待定任务计数为1时才返回。

async def loop_job():
asyncio.create_task(do_something_periodically())
while len(asyncio.Task.all_tasks()) > 1:  # Any task besides loop_job() itself?
await asyncio.sleep(0.2)


asyncio.run(loop_job())

我不确定这是否是你要求的,但是我有一个类似的问题,这是我想到的最终解决方案。

这段代码与 python3兼容,并且只使用公共异步 API (意味着没有破解的 _coro和不推荐的 API)。

import asyncio


async def fn():
await asyncio.sleep(1.5)
print('fn')


async def main():
print('main start')
asyncio.create_task(fn()) # run in parallel
await asyncio.sleep(0.2)
print('main end')




def async_run_and_await_all_tasks(main):
def get_pending_tasks():
tasks = asyncio.Task.all_tasks()
pending = [task for task in tasks if task != run_main_task and not task.done()]
return pending


async def run_main():
await main()


while True:
pending_tasks = get_pending_tasks()
if len(pending_tasks) == 0: return
await asyncio.gather(*pending_tasks)


loop = asyncio.new_event_loop()
run_main_coro = run_main()
run_main_task = loop.create_task(run_main_coro)
loop.run_until_complete(run_main_task)


# asyncio.run(main()) # doesn't print from fn task, because main finishes earlier
async_run_and_await_all_tasks(main)

产出(如预期) :

main start
main end
fn

异步 _ run _ and _ wait _ all _ asks 函数将使 python 以 nodejs 的方式行事: 只有在没有未完成的任务时才退出。

如果您想要一种干净的方式来等待在某个本地作用域内创建的所有正在运行的任务,而不会泄漏内存(并且在防止 垃圾收集错误的同时) ,那么您可以维护一组正在运行的任务,并使用 task.add_done_callback(...)从该集中删除该任务。下面是一个为您处理这个问题的类:

class TaskSet:
def __init__(self):
self.tasks = set()


def add(self, coroutine: Coroutine) -> Task:
task = asyncio.create_task(coroutine)
self.tasks.add(task)
task.add_done_callback(lambda _: self.tasks.remove(task))
return task


def __await__(self):
return asyncio.gather(*self.tasks).__await__()

可以这样使用:

async def my_function():
await asyncio.sleep(0.5)




async def go():
tasks = TaskSet()
for i in range(10):
tasks.add(my_function())
await tasks

我注意到一些建议使用 asyncio.gather(*asyncio.all_tasks())的答案,但是问题在于有时候可能是一个无限循环,它等待 asyncio.current_task()完成,也就是它自己。一些答案提出了一些复杂的变通方法,包括检查 coro名称或 len(asyncio.all_tasks()),但结果是利用 set操作非常简单:

async def main():
# Create some tasks.
for _ in range(10):
asyncio.create_task(asyncio.sleep(10))
# Wait for all other tasks to finish other than the current task i.e. main().
await asyncio.gather(*asyncio.all_tasks() - {asyncio.current_task()})

我的用例有一些产生短期任务的主要任务。当看到主要任务完成(以及一些暂时性任务)时,这个回答很好地立即退出,但是我想为其他任务整理一下。时间延迟不起作用(因为可能会创建额外的任务) ,所以积极使用 .cancel()似乎是正确的选择。

密码是:

import asyncio


MAX_TASKS = 10
task_maker_count = 0


async def task_maker():
global task_maker_count
task_maker_count += 1
if len(asyncio.all_tasks()) < MAX_TASKS:
asyncio.create_task(task_maker())
asyncio.create_task(task_maker())


async def main_task():
asyncio.create_task(task_maker())
await asyncio.sleep(2.0)


async def main():
global task_maker_count
asyncio.create_task(main_task())
asyncio.create_task(main_task())

测试

    await asyncio.gather(*asyncio.all_tasks() - {asyncio.current_task()})
for task in [*asyncio.all_tasks() - {asyncio.current_task()}]:
task.cancel()
await asyncio.gather(*asyncio.all_tasks() - {asyncio.current_task()},
return_exceptions=True)  # needed for CancelledError
    print(f'{task_maker_count} task_maker tasks created')


if __name__ == '__main__':
asyncio.run(main())

我电脑上的结果是:

194672 task_maker tasks created

虽然不是特别相关,但是将 MAX_TASKS突破到数千大大减少了完成任务的数量。

import asyncio


async def coroutine_to_run(timetosleepinseconds):
print(await asyncio.sleep(timetosleepinseconds, result=f"I have finished in {timetosleepinseconds} seconds"))
## Do your stuff


async def main():
tasks = [asyncio.create_task(coroutine_to_run(timetosleepinseconds=2)), asyncio.create_task(coroutine_to_run(timetosleepinseconds=3))]
await asyncio.gather(*tasks)


asyncio.run(main())