Python 中最简单的异步/等待示例

我读过很多例子,博客文章,关于 Python 3.5 + 中 asyncio/async/await的问答,很多都很复杂,我发现最简单的大概就是 这个
尽管如此,它仍然使用 ensure_future,为了学习 Python 中的异步编程,我想看一个更简单的示例,以及 最少的必要工具如何执行基本的异步/等待示例。

问: 是否可以给出一个 展示 ABC0/await工作原理的简单示例,只使用这两个关键字 + 代码运行异步循环 + 其他 Python 代码,而不使用其他 asyncio函数?

例如: 这样的东西:

import asyncio


async def async_foo():
print("async_foo started")
await asyncio.sleep(5)
print("async_foo done")


async def main():
asyncio.ensure_future(async_foo())  # fire and forget async_foo()
print('Do some actions 1')
await asyncio.sleep(5)
print('Do some actions 2')


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

但是没有 ensure_future,并且仍然演示了等待/异步是如何工作的。

176461 次浏览

有没有可能给出一个简单的例子,说明 async/await 工程,只使用这两个关键字 + asyncio.get_event_loop() + run_until_complete + 其他 Python 代码但没有其他 asyncio函数?

这样就有可能编写可以工作的代码:

import asyncio




async def main():
print('done!')




if __name__ ==  '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

但是这样就不可能说明为什么需要异步。

顺便问一下,你为什么需要 asyncio,而不仅仅是普通代码?答案是-asyncio允许您在并行化 I/O 阻塞操作(如对网络的读/写操作)时获得性能优势。要编写有用的示例,需要使用这些操作的异步实现。

请阅读 这个答案以获得更详细的解释。

更新:

下面的例子使用 asyncio.sleep来模拟 I/O 阻塞操作,使用 asyncio.gather来演示如何同时运行多个阻塞操作:

import asyncio




async def io_related(name):
print(f'{name} started')
await asyncio.sleep(1)
print(f'{name} finished')




async def main():
await asyncio.gather(
io_related('first'),
io_related('second'),
)  # 1s + 1s = over 1s




if __name__ ==  '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

产出:

first started
second started
first finished
second finished
[Finished in 1.2s]

注意这两个 io_related是如何开始的,仅仅一秒钟后,两个都完成了。

为了回答你的问题,我将提供3个不同的解决同一个问题。

案例1: 普通的 Python

import time


def sleep():
print(f'Time: {time.time() - start:.2f}')
time.sleep(1)


def sum(name, numbers):
total = 0
for number in numbers:
print(f'Task {name}: Computing {total}+{number}')
sleep()
total += number
print(f'Task {name}: Sum = {total}\n')


start = time.time()
tasks = [
sum("A", [1, 2]),
sum("B", [1, 2, 3]),
]
end = time.time()
print(f'Time: {end-start:.2f} sec')

产出:

Task A: Computing 0+1
Time: 0.00
Task A: Computing 1+2
Time: 1.00
Task A: Sum = 3


Task B: Computing 0+1
Time: 2.01
Task B: Computing 1+2
Time: 3.01
Task B: Computing 3+3
Time: 4.01
Task B: Sum = 6


Time: 5.02 sec

案例2: 异步/等待完成错误

import asyncio
import time


async def sleep():
print(f'Time: {time.time() - start:.2f}')
time.sleep(1)


async def sum(name, numbers):
total = 0
for number in numbers:
print(f'Task {name}: Computing {total}+{number}')
await sleep()
total += number
print(f'Task {name}: Sum = {total}\n')


start = time.time()


loop = asyncio.get_event_loop()
tasks = [
loop.create_task(sum("A", [1, 2])),
loop.create_task(sum("B", [1, 2, 3])),
]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()


end = time.time()
print(f'Time: {end-start:.2f} sec')

产出:

Task A: Computing 0+1
Time: 0.00
Task A: Computing 1+2
Time: 1.00
Task A: Sum = 3


Task B: Computing 0+1
Time: 2.01
Task B: Computing 1+2
Time: 3.01
Task B: Computing 3+3
Time: 4.01
Task B: Sum = 6


Time: 5.01 sec

案例3: 异步/等待正确完成

sleep函数外,与情况2相同:

async def sleep():
print(f'Time: {time.time() - start:.2f}')
await asyncio.sleep(1)

产出:

Task A: Computing 0+1
Time: 0.00
Task B: Computing 0+1
Time: 0.00
Task A: Computing 1+2
Time: 1.00
Task B: Computing 1+2
Time: 1.00
Task A: Sum = 3


Task B: Computing 3+3
Time: 2.00
Task B: Sum = 6


Time: 3.01 sec

案例1和案例2给出了相同的 5秒,而案例3只给出了 3秒

造成这种差异的原因在于 sleep函数的实现。

# case 1
def sleep():
...
time.sleep(1)


# case 2
async def sleep():
...
time.sleep(1)


# case 3
async def sleep():
...
await asyncio.sleep(1)

在情况1和情况2中,它们是相同的: 他们“睡觉”而不允许其他人使用这些资源。 而在案例3中,它允许在睡眠状态下访问资源。

在情况2中,我们将 async添加到普通函数中,但是事件循环将运行它 不受打扰。 为什么? 因为我们没有说循环允许在哪里中断你的函数来运行另一个任务。

在案例3中,我们告诉事件循环在哪里中断函数以运行另一个任务。具体在哪里?就在这里!

await asyncio.sleep(1)

更多信息请阅读 给你

2020年5月2日更新

考虑一下阅读

Python 3.7 + 现在有了 一个更简单的 API(在我看来) ,它有一个更简单的措辞(比“ sure _ future”更容易记住) : 你可以使用 create_task返回一个 Task 对象(如果需要的话,这在以后取消任务时会很有用)。

基本例子1

import asyncio


async def hello(i):
print(f"hello {i} started")
await asyncio.sleep(4)
print(f"hello {i} done")


async def main():
task1 = asyncio.create_task(hello(1))  # returns immediately, the task is created
await asyncio.sleep(3)
task2 = asyncio.create_task(hello(2))
await task1
await task2


asyncio.run(main())  # main loop

结果:

Hello 1开始了
Hello 2开始了
你好,完成
Hello 2完成


基本例子2

如果您需要获取这些异步函数的 返回值,那么 gather是有用的。以下示例的灵感来自 文件

import asyncio


async def factorial(n):
f = 1
for i in range(2, n + 1):
print(f"Computing factorial({n}), currently i={i}...")
await asyncio.sleep(1)
f *= i
return f


async def main():
L = await asyncio.gather(factorial(2), factorial(3), factorial(4))
print(L)  # [2, 6, 24]


asyncio.run(main())

预期产出:

计算阶乘(2) ,当前 i = 2..。
计算阶乘(3) ,当前 i = 2..。
计算阶乘(4) ,当前 i = 2..。
计算阶乘(3) ,当前 i = 3..。
计算阶乘(4) ,当前 i = 3..。
计算阶乘(4) ,当前 i = 4..。
[2,6,24]


PS: 即使您使用的是 asyncio而不是 trio后者的指导对我理解 Python 异步编程也是有帮助的。

既然一切都解释得很好,那么让我们运行一些例子,用事件循环将同步代码与异步代码进行比较。

同步码:

import time


def count():
time.sleep(1)
print('1')
time.sleep(1)
print('2')
time.sleep(1)
print('3')


def main():
for i in range(3):
count()


if __name__ == "__main__":
t = time.perf_counter()
main()
t2 = time.perf_counter()
    

print(f'Total time elapsed: {t2:0.2f} seconds')

产出:

1
2
3
1
2
3
1
2
3
Total time elapsed: 9.00 seconds

我们可以看到,每个计数周期运行到完成之前,下一个周期开始。

异步代码:

import asyncio
import time


async def count():
await asyncio.sleep(1)
print('1')
await asyncio.sleep(1)
print('2')
await asyncio.sleep(1)
print('3')


async def main():
await asyncio.gather(count(), count(), count())


if __name__ == "__main__":
t = time.perf_counter()
asyncio.run(main())
t2 = time.perf_counter()


print(f'Total time elapsed: {t2:0.2f} seconds')

产出:

1
1
1
2
2
2
3
3
3
Total time elapsed: 3.00 seconds

另一方面,异步等价物看起来像这样运行需要3秒钟,而不是9秒钟。 第一个计数周期开始了,一旦进入 await睡眠状态,Python 就可以自由地做其他工作,例如开始第二个计数周期,然后是第三个计数周期。 这就是为什么我们有所有的一个,而不是所有的管子,然后所有三个。 在输出编程中并行可以是一个非常有价值的工具。 多处理操作完成所有的多任务工作,在 Python 中,它是多核并发的唯一选择,即在 CPU 的多个核上执行程序。 如果使用线程,那么操作系统仍然执行所有的多任务工作,并且在 cpython 中,全局 intrepeter 锁可以防止异步编程中的多核并发。 没有操作系统干预只有一个进程只有一个线程所以正在运行的任务可以在等待期间释放 CPU 这样其他任务就可以使用它了。

import asyncio


loop = asyncio.get_event_loop()




async def greeter(name):
print(f"Hi, {name} you're in a coroutine.")


try:
print('starting coroutine')
coro = greeter('LP')
print('entering event loop')
loop.run_until_complete(coro)
finally:
print('closing event loop')
loop.close()

产出:

starting coroutine
entering event loop
Hi, LP you're in a coroutine.
closing event loop

异步框架需要一个通常称为事件循环的调度器。这个事件循环跟踪所有正在运行的任务,当一个函数挂起时,它将控制权返回给事件循环,事件循环会找到另一个函数来启动或恢复,这就是所谓的合作多任务。Async IO 为框架提供了一个以事件循环为中心的异步框架,它有效地处理应用程序与事件循环交互的输入/输出事件,它显式地注册要运行的代码,然后让事件循环调度程序在资源可用时对应用程序代码进行必要的调用。 因此,如果网络服务器打开套接字,然后注册它们,当输入事件发生在它们上面时,事件循环将在有新的传入连接或有数据要读取时通知服务器代码。 如果从套接字中读取的数据不多于服务器,则将控制权交还给事件循环。

协同例程是为并发操作而设计的一种语言结构。共同例程可以使用唤醒关键字和另一个共同例程暂停执行,当它暂停时,共同例程状态得到维护,允许它在一个共同例程停止的地方恢复,可以启动另一个共同例程,然后等待结果,这使任务更容易分解成可重用的部分。

import asyncio


loop = asyncio.get_event_loop()


async def outer():
print('in outer')
print('waiting for result 1')
result1 = await phase1()
print('waiting for result 2')
result2 = await phase2(result1)
return result1, result2




async def phase1():
print('in phase1')
return 'phase1 result'


async def phase2(arg):
print('in phase2')
return 'result2 derived from {}'.format(arg)


asyncio.run(outer())

产出:

in outer
waiting for result 1
in phase1
waiting for result 2
in phase2

此示例询问两个必须按顺序执行但可以与其他操作并发运行的阶段。使用 awake关键字而不是将新的协同例程添加到循环中,因为控制流已经在由循环管理的协同例程中。没有必要告诉循环来管理新的协同例程。

import asyncio
import requests


async def fetch_users():
response = requests.get('https://www.testjsonapi.com/users/')
users = response.json()
return users


async def print_users():
# create an asynchronous task to run concurrently
# which wont block executing print statement before it finishes
response = asyncio.create_task(fetch_users())
print("Fetching users ")
# wait to get users data from response before printing users
users = await response


for user in users:
print(f"name : {user['name']} email : {user['email']}")


asyncio.run(print_users())
print("All users printed in console")

输出将如下所示

Fetching users
name : Harjas Malhotra email : harjas@gmail.com
name : Alisha Paul email : alisha@gmail.com
name : Mart Right email : marrk9658@yahoo.com
name : Brad Pitter email : brad@gmail.com
name : Ervin Dugg email : Ervin69@gmail.com
name : Graham Bell email : Graham@bell.biz
name : James Rush email : james369@hotmail.com
name : Deepak Dev email : deepak@gmail.com
name : Ajay Rich email : therichposts@gmail.com
All users printed in console

让我们看看代码是如何工作的。首先,当 python 将调用 print_users()时,它不会让它下面的 print 语句被执行,直到它完成。因此,在进入 print_users()之后,将创建一个并发任务,以便它下面的语句可以与这里的 fetch_users()任务同时运行。当此任务将在此时间运行 Fetching users将打印在控制台。在此之后,python 将等待来自 fetch_users()的响应,因为在接收之前不应该打印用户。fetch_users()完成后,所有用户的姓名和电子邮件将打印在控制台。因此,在完成 print_users()打印语句下面它将被执行。

我不知道为什么,但是所有关于这个主题的解释都太复杂了,或者他们使用了无用的异步.sleep () ..。 到目前为止,我找到的最好的代码示例是这样的: < a href = “ https://codeflex.co/python3-sync-waiting-example/”rel = “ nofollow norefrer”> https://codeflex.co/python3-async-await-example/

似乎每个人都把注意力集中在将 time.sleep转换为 asyncio.sleep上,但在现实世界中,这总是不可能的。有时你需要做一个库调用,这可能是一个 API 调用(例如: 请求从谷歌签名的 URL)。

下面是仍然可以使用 time.sleep,但是以异步方式使用的方法:

import asyncio
import time
from concurrent.futures.thread import ThreadPoolExecutor


def sleep():
print(f'Time: {time.time() - start:.2f}')
time.sleep(1)


async def sum(name, numbers):
_executor = ThreadPoolExecutor(2)
total = 0
for number in numbers:
print(f'Task {name}: Computing {total}+{number}')
await loop.run_in_executor(_executor, sleep)
total += number
print(f'Task {name}: Sum = {total}\n')


start = time.time()


loop = asyncio.get_event_loop()
tasks = [
loop.create_task(sum("A", [1, 2])),
loop.create_task(sum("B", [1, 2, 3])),
]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()


end = time.time()
print(f'Time: {end-start:.2f} sec')

产出:

Task A: Computing 0+1
Time: 0.00
Task B: Computing 0+1
Time: 0.00
Task A: Computing 1+2
Time: 1.00
Task B: Computing 1+2
Time: 1.00
Task A: Sum = 3


Task B: Computing 3+3
Time: 2.01
Task B: Sum = 6


Time: 3.01 sec

简单. . 甜蜜. . 棒极了. . something

  import asyncio
import time
import random


async def eat():
wait = random.randint(0,3)
await asyncio.sleep(wait)
print("Done With Eating")


async def sleep():
wait = random.randint(0,3)
await asyncio.sleep(wait)
print("Done With Sleeping")


async def repeat():
wait = random.randint(0,3)
await asyncio.sleep(wait)
print("Done With Repeating")


async def main():
for x in range(5):
await asyncio.gather(eat(),sleep(),repeat())
time.sleep(2)
print("+","-"*20)


if __name__ == "__main__":
t = time.perf_counter()
asyncio.run(main())
t2 = time.perf_counter()


print(f'Total time elapsed: {t2:0.2f} seconds')

尽管上面的答案有点抽象

from datetime import datetime
import asyncio








async def time_taking(max_val,task_no):
print("**TASK STARTING TO EXECUTE CONCURRENT TASk NO {} ***".format(task_no))


await asyncio.sleep(2)
value_list = []
for i in range(0,max_val):
value_list.append(i)


print("****FINSIHING UP TASk NO {}  **".format(task_no))
return value_list






async def test2(task_no):
await asyncio.sleep(5)
print("**TASK STARTING TO EXECUTE CONCURRENT TASk NO {} ***".format(task_no))
await asyncio.sleep(5)
print("****FINSIHING UP  TASk NO {}  **".format(task_no))


async def function(value = None):
tasks = []
start_time = datetime.now()
    

# CONCURRENT TASKS
tasks.append(asyncio.create_task(time_taking(20,1)))
tasks.append(asyncio.create_task(time_taking(43,2)))
tasks.append(asyncio.create_task(test2(3)))
    

# concurrent execution
lists = await asyncio.gather(*tasks)
end_time = datetime.now()
    

time_taken = end_time - start_time
return lists,time_taken




# run inside event loop
res,time_taken = asyncio.run(function())


print(res,time_taken)