什么是协程

协程 (Coroutine), 也被称为微线程, 是一种比线程更轻量级的存在。简单来说, 协程是一种用户态的轻量级线程, 协程的调度完全由用户控制。

协程 vs 线程 vs 进程

特性进程线程协程
切换开销最大中等最小
内存占用最大中等最小
并发能力多进程并行多线程并发单线程内并发
数据共享IPC复杂共享内存, 需要锁无需锁
创建数量少量中量大量

协程的核心优势:

  • 极低的上下文切换成本
  • 单线程即可实现高并发
  • 避免了多线程的锁竞争问题
  • 代码结构清晰, 逻辑同步

协程的基本概念

生成器 (Generator)

在Python 3.5之前, 协程是通过生成器实现的。生成器是一种特殊的迭代器, 可以在执行过程中暂停和恢复。

def simple_generator():
    print("开始执行")
    yield 1
    print("继续执行")
    yield 2
    print("结束执行")

# 创建生成器
gen = simple_generator()

# 手动迭代
print(next(gen))  # 输出: 开始执行, 1
print(next(gen))  # 输出: 继续执行, 2
print(next(gen))  # StopIteration

yield 关键字

  • yield 类似于 return, 但不会结束函数
  • 每次调用 next() 时, 生成器执行到 yield 并返回值
  • 下次调用 next() 时, 从上次的 yield 继续执行
def countdown(n):
    print("开始倒计时")
    while n > 0:
        yield n
        n -= 1

for i in countdown(5):
    print(i)

# 输出:
# 开始倒计时
# 5
# 4
# 3
# 2
# 1

send() 方法

生成器的 send() 方法可以向生成器内部发送数据:

def accumulator():
    total = 0
    while True:
        value = yield total
        if value is not None:
            total += value

acc = accumulator()
next(acc)  # 启动生成器 (预激活)
print(acc.send(10))  # 10
print(acc.send(20))  # 30
print(acc.send(30))  # 60

async/await 语法

Python 3.5 引入了 asyncawait 关键字, 大大简化了协程的编写。

async def 定义协程

import asyncio

async def hello():
    print("Hello")
    await asyncio.sleep(1)
    print("World")

# 运行协程
asyncio.run(hello())

# 输出:
# Hello
# (等待1秒)
# World

await 暂停协程

await 用于等待一个可等待对象 (如协程、Task、Future), 在等待期间, 事件循环可以执行其他协程。

import asyncio
import time

async def count():
    print("One")
    await asyncio.sleep(1)
    print("Two")

async def main():
    await asyncio.gather(count(), count(), count())

start = time.time()
asyncio.run(main())
end = time.time()

print(f"总耗时: {end - start:.2f}秒")

# 输出:
# One
# One
# One
# Two
# Two
# Two
# 总耗时: 1.00秒

注意: 如果使用同步的 time.sleep(), 则会是3秒!

asyncio 模块详解

asyncio 是Python的标准库, 提供了完整的异步编程支持。

事件循环 (Event Loop)

事件循环是 asyncio 的核心, 负责调度和执行所有协程。

import asyncio

async def task(name, delay):
    print(f"任务 {name} 开始")
    await asyncio.sleep(delay)
    print(f"任务 {name} 完成")
    return f"{name}的结果"

async def main():
    # 获取当前事件循环
    loop = asyncio.get_running_loop()
    print(f"事件循环: {loop}")
    
    # 创建多个任务
    task1 = asyncio.create_task(task("A", 1))
    task2 = asyncio.create_task(task("B", 2))
    task3 = asyncio.create_task(task("C", 1.5))
    
    # 等待所有任务完成
    results = await asyncio.gather(task1, task2, task3)
    print(f"结果: {results}")

asyncio.run(main())

创建任务 (Tasks)

import asyncio

async def fetch_data(id):
    await asyncio.sleep(id)
    return f"数据{id}"

async def main():
    # 方式1: 使用 asyncio.create_task
    task1 = asyncio.create_task(fetch_data(1))
    task2 = asyncio.create_task(fetch_data(2))
    
    # 等待任务完成
    result1 = await task1
    result2 = await task2
    print(result1, result2)
    
    # 方式2: 使用 asyncio.gather
    tasks = [fetch_data(i) for i in range(3, 6)]
    results = await asyncio.gather(*tasks)
    print(results)

asyncio.run(main())

超时控制

import asyncio

async def long_operation():
    print("开始长时间操作...")
    await asyncio.sleep(5)
    print("操作完成")
    return "结果"

async def main():
    try:
        # 设置超时时间为2秒
        result = await asyncio.wait_for(long_operation(), timeout=2.0)
        print(result)
    except asyncio.TimeoutError:
        print("操作超时!")

asyncio.run(main())

并发与并行

import asyncio

async def worker(name, seconds):
    print(f"{name} 开始工作")
    await asyncio.sleep(seconds)
    print(f"{name} 完成工作")
    return f"{name}耗时{seconds}秒"

async def main():
    print("=== 并发执行 ===")
    start = asyncio.get_event_loop().time()
    
    # 并发执行多个协程
    results = await asyncio.gather(
        worker("任务1", 2),
        worker("任务2", 3),
        worker("任务3", 1)
    )
    
    end = asyncio.get_event_loop().time()
    print(f"总耗时: {end - start:.2f}秒")
    print(f"结果: {results}")

asyncio.run(main())

实际应用示例

异步HTTP请求

import asyncio
import aiohttp

async def fetch_url(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    urls = [
        "https://www.example.com",
        "https://www.python.org",
        "https://www.github.com"
    ]
    
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        
        for url, content in zip(urls, results):
            print(f"{url}: {len(content)} 字符")

asyncio.run(main())

异步文件操作

import asyncio
import aiofiles

async def read_file(filename):
    async with aiofiles.open(filename, mode='r') as f:
        return await f.read()

async def write_file(filename, content):
    async with aiofiles.open(filename, mode='w') as f:
        await f.write(content)

async def main():
    # 写入文件
    await write_file("test.txt", "Hello, asyncio!")
    
    # 读取文件
    content = await read_file("test.txt")
    print(content)

asyncio.run(main())

异步数据库操作

import asyncio
import aiomysql

async def query_database():
    conn = await aiomysql.connect(
        host='127.0.0.1',
        port=3306,
        user='root',
        password='password',
        db='test'
    )
    
    try:
        async with conn.cursor() as cur:
            await cur.execute("SELECT * FROM users")
            result = await cur.fetchall()
            print(result)
    finally:
        conn.close()

asyncio.run(query_database())

协程的高级用法

协程链

import asyncio

async def compute(x, y):
    print(f"计算 {x} + {y}")
    await asyncio.sleep(1)
    return x + y

async def main():
    # 协程链: 一个协程等待另一个协程的结果
    result1 = await compute(1, 2)
    result2 = await compute(result1, 3)
    result3 = await compute(result2, 4)
    
    print(f"最终结果: {result3}")

asyncio.run(main())

异步上下文管理器

import asyncio

class AsyncContextManager:
    async def __aenter__(self):
        print("进入上下文")
        await asyncio.sleep(0.5)
        return self
    
    async def __aexit__(self, exc_type, exc, tb):
        print("退出上下文")
        await asyncio.sleep(0.5)
    
    async def do_something(self):
        print("执行操作")

async def main():
    async with AsyncContextManager() as manager:
        await manager.do_something()

asyncio.run(main())

异步迭代器

import asyncio

class AsyncIterator:
    def __init__(self, n):
        self.n = n
        self.current = 0
    
    def __aiter__(self):
        return self
    
    async def __anext__(self):
        if self.current >= self.n:
            raise StopAsyncIteration
        await asyncio.sleep(0.5)
        self.current += 1
        return self.current

async def main():
    async for i in AsyncIterator(5):
        print(f"迭代: {i}")

asyncio.run(main())

信号量 (Semaphore)

信号量用于限制并发数量:

import asyncio

async def worker(semaphore, worker_id):
    async with semaphore:
        print(f"Worker {worker_id} 获取信号量")
        await asyncio.sleep(2)
        print(f"Worker {worker_id} 释放信号量")

async def main():
    # 限制最多3个并发
    semaphore = asyncio.Semaphore(3)
    
    # 创建10个任务
    tasks = [worker(semaphore, i) for i in range(10)]
    await asyncio.gather(*tasks)

asyncio.run(main())

异步编程最佳实践

1. 不要在协程中使用阻塞操作

# ❌ 错误: 阻塞整个事件循环
import time

async def bad_example():
    time.sleep(5)  # 同步阻塞!

# ✅ 正确: 使用异步版本
import asyncio

async def good_example():
    await asyncio.sleep(5)  # 非阻塞

2. 合理使用 gather 和 wait

import asyncio

async def task(name, delay):
    await asyncio.sleep(delay)
    print(f"{name} 完成")

async def main():
    # gather: 等待所有任务完成
    await asyncio.gather(
        task("A", 1),
        task("B", 2),
        task("C", 3)
    )
    
    # wait: 更精细的控制
    done, pending = await asyncio.wait([
        task("X", 1),
        task("Y", 2),
        task("Z", 3)
    ], timeout=2.5)
    
    print(f"完成的任务: {len(done)}")
    print(f"未完成的任务: {len(pending)}")
    
    # 取消未完成的任务
    for task in pending:
        task.cancel()

asyncio.run(main())

3. 错误处理

import asyncio

async def might_fail(should_fail=False):
    await asyncio.sleep(1)
    if should_fail:
        raise ValueError("任务失败")
    return "成功"

async def main():
    # 方式1: try-except
    try:
        result = await might_fail(should_fail=True)
        print(result)
    except ValueError as e:
        print(f"捕获到错误: {e}")
    
    # 方式2: return_exceptions
    results = await asyncio.gather(
        might_fail(should_fail=False),
        might_fail(should_fail=True),
        might_fail(should_fail=False),
        return_exceptions=True
    )
    
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"任务{i+1} 失败: {result}")
        else:
            print(f"任务{i+1} 成功: {result}")

asyncio.run(main())

4. 避免过度使用协程

协程不是万能的, 在以下情况不需要使用协程:

  • CPU密集型任务 (使用多进程)
  • 简单的顺序任务
  • 不涉及IO的操作
# ❌ 不适合: CPU密集型
async def cpu_intensive():
    result = sum(i**2 for i in range(10000000))  # 阻塞CPU
    return result

# ✅ 适合: IO密集型
async def io_intensive():
    async with aiohttp.ClientSession() as session:
        async with session.get('https://api.example.com') as response:
            return await response.json()

性能对比

让我们对比一下同步和异步的性能差异:

import asyncio
import time

# 同步版本
def sync_fetch(url):
    time.sleep(1)  # 模拟IO等待
    return f"同步获取 {url}"

def sync_main():
    urls = [f"url{i}" for i in range(10)]
    start = time.time()
    
    results = [sync_fetch(url) for url in urls]
    
    end = time.time()
    print(f"同步总耗时: {end - start:.2f}秒")
    return results

# 异步版本
async def async_fetch(url):
    await asyncio.sleep(1)  # 模拟IO等待
    return f"异步获取 {url}"

async def async_main():
    urls = [f"url{i}" for i in range(10)]
    start = time.time()
    
    tasks = [async_fetch(url) for url in urls]
    results = await asyncio.gather(*tasks)
    
    end = time.time()
    print(f"异步总耗时: {end - start:.2f}秒")
    return results

# 执行对比
print("=== 同步执行 ===")
sync_main()

print("\n=== 异步执行 ===")
asyncio.run(async_main())

输出:

=== 同步执行 ===
同步总耗时: 10.00秒

=== 异步执行 ===
异步总耗时: 1.00秒

可以看到, 在IO密集型任务中, 异步编程可以显著提升性能!

总结

协程的核心要点

  1. 协程是轻量级的: 切换成本远低于线程
  2. 单线程并发: 通过事件循环调度多个协程
  3. 非阻塞IO: 使用 async/await 实现异步操作
  4. 适合IO密集型: 网络请求、文件操作等
  5. 不适合CPU密集型: 这类任务应该用多进程

学习路径建议

  1. 基础: 理解生成器和 yield
  2. 入门: 学习 async/await 语法
  3. 进阶: 掌握 asyncio 模块
  4. 实战: 使用 aiohttp、aiofiles 等异步库
  5. 深入: 学习事件循环原理和性能优化

常用异步库

  • asyncio: 标准库异步框架
  • aiohttp: 异步HTTP客户端/服务器
  • aiofiles: 异步文件操作
  • aiomysql: 异步MySQL客户端
  • aioredis: 异步Redis客户端
  • asyncpg: 异步PostgreSQL客户端

异步编程是现代Python开发的重要技能, 特别在Web开发、爬虫、微服务等领域有着广泛的应用。掌握协程和asyncio, 可以让你的代码更加高效、优雅!