协程 (Coroutine), 是一种比线程更轻量级的存在。它是 单线程里的并发,靠的是 协作式调度,不是操作系统强行切。

为什么需要协程?

想象一个典型场景:

请求 A:等数据库
请求 B:等网络
请求 C:等磁盘

如果你用 同步代码:

handle(A)  # 卡住
handle(B)
handle(C)

CPU 大量时间在 等 I/O,很浪费。

协程的思想是: 当我遇到 I/O 等待时,我主动让出 CPU,让别人先跑。

协程 vs 线程 vs 进程

特性进程线程协程
切换开销最大中等最小
内存占用最大中等最小
并发能力多进程并行多线程并发单线程内并发
数据共享IPC复杂共享内存, 需要锁无需锁
创建数量少量中量大量

入门

使用 async/await 语法声明的 协程 是编写 asyncio应用程序 的首选方式。

术语 “协程” 可用于表示两个概念:
一个协程函数:一个 async def 函数;
一个协程对象:调用 协程函数后返回的对象。

asyncio.run

例如,以下代码片段会打印“hello”,等待1秒,然后打印“world”:

import asyncio

async def main():
    print('hello')
    await asyncio.sleep(1)
    print('world')

asyncio.run(main())

注意,简单地调用协程并不会安排它执行:

>>> main()
<coroutine object main at 0x1053bb7c8>

await

要实际运行协程,asyncio 提供了以下机制:

  1. asyncio.run() 函数用于运行顶级入口点“main()”函数(上面已经演示过)
  2. await 一个协程。以下代码片段将在等待1秒后打印“hello”,然后在再等待2秒后打印“world”
import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)

async def main():
    print(f"started at {time.strftime('%X')}")

    await say_after(1, 'hello')
    await say_after(2, 'world')

    print(f"finished at {time.strftime('%X')}")

asyncio.run(main())

时间花费 3 秒, 输出:

started at 17:13:52
hello
world
finished at 17:13:55

asyncio.create_task

asyncio.create_task() 函数用于将协程作为 asyncio Tasks 并发运行。

并发运行两个 say_after 协程:

async def main():
    task1 = asyncio.create_task(
        say_after(1, 'hello'))

    task2 = asyncio.create_task(
        say_after(2, 'world'))

    print(f"started at {time.strftime('%X')}")

    # Wait until both tasks are completed (should take around 2 seconds.)
    await task1
    await task2

    print(f"finished at {time.strftime('%X')}")

时间花费 2 秒(比之前快 1 秒), 输出:

started at 17:14:32
hello
world
finished at 17:14:34

asyncio.TaskGroup

asyncio.TaskGroup 类提供了一种比create_task()更现代的替代方案。使用此API,最后一个示例变为:

async def main():
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(
            say_after(1, 'hello'))

        task2 = tg.create_task(
            say_after(2, 'world'))

        print(f"started at {time.strftime('%X')}")

    # The await is implicit when the context manager exits.

    print(f"finished at {time.strftime('%X')}")

时间花费也是 2 秒。

asyncio 模块详解

asyncio 是Python的标准库, 提供了完整的异步编程支持。

可等待对象

如果一个对象可以在 await 表达式中使用,那么它就是一个可等待对象。许多 asyncio API 被设计为接受 可等待对象(awaitable object)。

可等待对象主要有三种类型:

  • coroutine(协程)
  • Task(任务)
  • Future

协程 是可等待对象,因此可以从其他协程中等待:

import asyncio

async def nested():
    return 42

async def main():
    # Nothing happens if we just call "nested()".
    # A coroutine object is created but not awaited,
    # so it *won't run at all*.
    nested()  # will raise a "RuntimeWarning".

    # Let's do it differently now and await it:
    print(await nested())  # will print "42".

asyncio.run(main())

任务 用于 并发地调度 协程。
当 coroutine 通过 asyncio.create_task() 之类的函数包装成 task 时,该 coroutine 会被自动安排尽快运行:

import asyncio

async def nested():
    return 42

async def main():
    # Schedule nested() to run soon concurrently
    # with "main()".
    task = asyncio.create_task(nested())

    # "task" can now be used to cancel "nested()", or
    # can simply be awaited to wait until it is complete:
    await task

asyncio.run(main())

Future 是一种 更底层的(low-level) 可等待对象,它表示异步操作的 最终结果。
asyncio中的Future对象用于允许基于回调的代码与async/await一起使用。
通常情况下,在 应用级代码中 没有必要创建 Future 对象。

事件循环 (Event Loop)

事件循环是 asyncio 的核心, 负责调度和执行所有协程。

import asyncio

async def task(name, delay):
    print(f"任务 {name} 开始")
    await asyncio.sleep(delay)
    print(f"任务 {name} 完成")
    return f"{name}的结果"

async def main():
    # 获取当前事件循环
    loop = asyncio.get_running_loop()
    print(f"事件循环: {loop}")
    
    # 创建多个任务
    task1 = asyncio.create_task(task("A", 1))
    task2 = asyncio.create_task(task("B", 2))
    task3 = asyncio.create_task(task("C", 1.5))
    
    # 等待所有任务完成
    results = await asyncio.gather(task1, task2, task3)
    print(f"结果: {results}")

asyncio.run(main())

gather 和 as_completed

asyncio.gather(*aws) 并发运行 可等待对象序列(aws, awaitable objects) 中的内容。
如果 aws 中的任何可等待对象是 协程,它会被自动调度为一个 任务。

import asyncio

async def fetch_data(id):
    await asyncio.sleep(id)
    return f"数据{id}"

async def main():
    # 方式1: 使用 asyncio.create_task
    task1 = asyncio.create_task(fetch_data(1))
    task2 = asyncio.create_task(fetch_data(2))
    
    # 等待任务完成
    result1 = await task1
    result2 = await task2
    print(result1, result2)
    
    # 方式2: 使用 asyncio.gather
    tasks = [fetch_data(i) for i in range(3, 6)]
    results = await asyncio.gather(*tasks)
    print(results)

asyncio.run(main())

asyncio.as_completed(aws) 会返回一个迭代器,按 完成顺序 产出协程结果,而不是提交顺序。
as_completed(aws) 中的 aws 是 可等待对象列表。

普通 gather(按提交顺序返回):

import asyncio

async def task(name, delay):
    await asyncio.sleep(delay)
    return f"{name} done"

async def main():
    results = await asyncio.gather(
        task("A", 2),
        task("B", 1),
        task("C", 3),
    )
    print(results)

asyncio.run(main())

# 输出(固定顺序):
# ['A done', 'B done', 'C done']

使用 as_completed(按完成顺序处理):

import asyncio

async def task(name, delay):
    await asyncio.sleep(delay)
    return f"{name} done"

async def main():
    tasks = [
        task("A", 2),
        task("B", 1),
        task("C", 3),
    ]

    for coro in asyncio.as_completed(tasks):
        result = await coro
        print(result)

asyncio.run(main())

超时控制

import asyncio

async def long_operation():
    print("开始长时间操作...")
    await asyncio.sleep(5)
    print("操作完成")
    return "结果"

async def main():
    try:
        # 设置超时时间为2秒
        result = await asyncio.wait_for(long_operation(), timeout=2.0)
        print(result)
    except asyncio.TimeoutError:
        print("操作超时!")

asyncio.run(main())

除了 wait_for, 还有 timeout_atwait 也是类似功能.

asyncio.timeout(delay) 返回一个异步上下文管理器,它可用于限制等待某事物所花费的时间。

async def main():
    try:
        async with asyncio.timeout(10):
            await long_running_task()
    except TimeoutError:
        print("The long operation timed out, but we've handled it.")

    print("This statement will run regardless.")

实际应用示例

异步HTTP请求

import asyncio
import aiohttp

async def fetch_url(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    urls = [
        "https://www.example.com",
        "https://www.python.org",
        "https://www.github.com"
    ]
    
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        
        for url, content in zip(urls, results):
            print(f"{url}: {len(content)} 字符")

asyncio.run(main())

异步文件操作

import asyncio
import aiofiles

async def read_file(filename):
    async with aiofiles.open(filename, mode='r') as f:
        return await f.read()

async def write_file(filename, content):
    async with aiofiles.open(filename, mode='w') as f:
        await f.write(content)

async def main():
    # 写入文件
    await write_file("test.txt", "Hello, asyncio!")
    
    # 读取文件
    content = await read_file("test.txt")
    print(content)

asyncio.run(main())

异步数据库操作

import asyncio
import aiomysql

async def query_database():
    conn = await aiomysql.connect(
        host='127.0.0.1',
        port=3306,
        user='root',
        password='password',
        db='test'
    )
    
    try:
        async with conn.cursor() as cur:
            await cur.execute("SELECT * FROM users")
            result = await cur.fetchall()
            print(result)
    finally:
        conn.close()

asyncio.run(query_database())

异步编程最佳实践

1. 不要在协程中使用阻塞操作

# ❌ 错误: 阻塞整个事件循环
import time

async def bad_example():
    time.sleep(5)  # 同步阻塞!

# ✅ 正确: 使用异步版本
import asyncio

async def good_example():
    await asyncio.sleep(5)  # 非阻塞

2. 合理使用 gather 和 wait

import asyncio

async def task(name, delay):
    await asyncio.sleep(delay)
    print(f"{name} 完成")

async def main():
    # gather: 等待所有任务完成
    await asyncio.gather(
        task("A", 1),
        task("B", 2),
        task("C", 3)
    )
    
    # wait: 更精细的控制
    done, pending = await asyncio.wait([
        task("X", 1),
        task("Y", 2),
        task("Z", 3)
    ], timeout=2.5)
    
    print(f"完成的任务: {len(done)}")
    print(f"未完成的任务: {len(pending)}")
    
    # 取消未完成的任务
    for task in pending:
        task.cancel()

asyncio.run(main())

3. 错误处理

import asyncio

async def might_fail(should_fail=False):
    await asyncio.sleep(1)
    if should_fail:
        raise ValueError("任务失败")
    return "成功"

async def main():
    # 方式1: try-except
    try:
        result = await might_fail(should_fail=True)
        print(result)
    except ValueError as e:
        print(f"捕获到错误: {e}")
    
    # 方式2: return_exceptions
    results = await asyncio.gather(
        might_fail(should_fail=False),
        might_fail(should_fail=True),
        might_fail(should_fail=False),
        return_exceptions=True
    )
    
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"任务{i+1} 失败: {result}")
        else:
            print(f"任务{i+1} 成功: {result}")

asyncio.run(main())

4. 避免过度使用协程

协程不是万能的, 在以下情况不需要使用协程:

  • CPU密集型任务 (使用多进程)
  • 简单的顺序任务
  • 不涉及IO的操作
# ❌ 不适合: CPU密集型
async def cpu_intensive():
    result = sum(i**2 for i in range(10000000))  # 阻塞CPU
    return result

# ✅ 适合: IO密集型
async def io_intensive():
    async with aiohttp.ClientSession() as session:
        async with session.get('https://api.example.com') as response:
            return await response.json()

参考

  1. Coroutines and Tasks