0%

asyncio浅析

Python asyncio机制

Event Loop

Python 的事件循环(Event Loop)是异步编程中的核心概念,通常与 asyncio 库(Python3.5引入)一起使用。事件循环负责管理和调度异步任务,确保这些任务在适当的时机运行,而不会阻塞主线程。事件循环是基于“协程”的模型,它允许 Python 程序在进行 I/O 操作或等待某些事件时不会阻塞其他操作,从而提高程序的效率。
event loop的基本流程:

  • 定义协程(coroutines): 协程是 Python 中的异步函数(Asynchronous Functions),通常用 async def 定义,执行时不会立即执行,而是返回一个 “awaitable” 对象。
  • 创建事件循环: asyncio.get_event_loop() 创建一个事件循环,它负责管理和调度所有异步任务。
  • 运行事件循环: 使用 loop.run_until_complete() 来启动事件循环,直到指定的任务完成。
  • 任务调度: 事件循环会根据任务的状态(是否可以执行)决定哪个任务应该被调度执行。

asyncio 的基本使用规范

异步函数是使用 async def 语法定义的,它们返回一个协程对象,而不是直接执行函数体。当你调用这个函数时,实际上返回的是一个尚未完成的协程对象,需要通过事件循环来执行。
定义一个异步函数如:

1
2
async def my_coroutine():
await asynico.sleep(2)

await 用于挂起协程的执行,等待一个异步操作完成后再继续执行。

asyncio 提供了事件循环来调度异步任务。事件循环负责执行异步任务,并管理任务的调度。

  • asyncio.run():用于运行一个顶层的异步函数。它创建一个事件循环并运行直到协程执行完成,最后关闭事件循环。通常用于程序的入口。
  • asyncio.run(main()):该方法会运行 main() 协程,并在执行完毕后关闭事件循环。
1
2
3
4
5
# 使用 asyncio.run() 启动事件循环
async def main():
await my_coroutine()

asyncio.run(main())
  • asyncio.gather():用于并发执行多个协程,等到所有协程都完成后,才继续执行后面的代码。
1
2
3
4
5
6
7
async def another_coroutine():
print("开始执行另一个协程")
await asyncio.sleep(1)
print("另一个协程执行结束")

async def main():
await asyncio.gather(my_coroutine(), another_coroutine()) # 并发执行

除了 asyncio.run(),还可以手动管理事件循环和任务,使用 loop.create_task()loop.run_until_complete() 来调度任务。

  • loop.create_task():用于创建一个协程任务,返回一个任务对象。
  • loop.run_until_complete():用于启动事件循环并直到某个任务完成。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
async def task1():
print("任务 1 开始")
await asyncio.sleep(1)
print("任务 1 完成")

async def task2():
print("任务 2 开始")
await asyncio.sleep(2)
print("任务 2 完成")

# 手动创建事件循环并执行任务
loop = asyncio.get_event_loop()
tasks = [loop.create_task(task1()), loop.create_task(task2())]
loop.run_until_complete(asyncio.gather(*tasks))
  • 在协程中使用 try...except 来捕获异常。
  • 异常会在协程执行过程中抛出并传播,直到被捕获为止。
1
2
3
4
5
6
7
8
9
10
async def my_coroutine():
raise ValueError("发生错误")

async def main():
try:
await my_coroutine()
except ValueError as e:
print(f"捕获异常: {e}")

asyncio.run(main())
  • 事件循环会在程序执行完毕后自动关闭,但如果需要提前停止,可以调用 loop.stop()
1
2
3
4
5
6
7
8
async def main():
print("开始任务")
await asyncio.sleep(2)
print("结束任务")

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.stop() # 关闭事件循环

简单的上手

以下代码请在python3.7及以上运行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import asyncio
import time
#这是一个coroutine
async def say_after(delay, what):
    #await将asyncio.sleep(delay)这个coroutine转化为task
    await asyncio.sleep(delay)
    print(what)
   
async def main():
    print(f"start at {time.strftime('%X')}")
    #await将say_after()这个coroutine转化为task,并告知Event Loop
    await say_after(1, "hello")
    await say_after(2, "world")
    print(f"finish at {time.strftime('%X')}")
   
asyncio.run(main())

输出为:

1
2
3
4
start at 15:46:50
hello
world
finish at 15:46:53

花费三秒。整个事件过程为:
asyncio.run将main()包装为了一个task,然后加入Event Loop。Event Loop中此时只有一个task即main,随即运行main(),控制权交付于main。
随后,main将控制权交给say_after,运行say_after(1, “hello”)这个coroutine function得到一个coroutine object(代码并不会在这里阻塞),而await将coroutine object转化为task,放到Event Loop中,并告知需要等待这个sleep,控制权交给sleep。
1s后,sleep这个task完成,Event Loop将控制权交还给main,main运行下一个task,重复上述步骤。

但是这样并没有意义,运行的事件并没有缩短。因此,我们改变策略为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)
   
async def main():
task1 = asyncio.create_task(say_after(1, "hello"))
task2 = asyncio.create_task(say_after(2, "world"))

    print(f"start at {time.strftime('%X')}")
    await task1
    await task2
    print(f"finish at {time.strftime('%X')}")
   
asyncio.run(main())

输出为

1
2
3
4
start at 16:23:01
hello
world
finish at 16:23:03

这次,我们预先包装了两个task,在其放到Event Loop后,Event Loop并不会将控制权交由task,从而实现串行。

如何返回task的值?
最简单:

1
a = await task1

使用gather:

1
a = await asyncio.gather(task1, taske)

gather还有一个好处,可以不需要包装为task再输入,可以直接读取coroutine:

1
2
3
4
5
6
a = await asyncio.gather(
say_after(1, "hello"),
say_after(2, "world")
)

print(a)