什么是协程
协程 (Coroutine), 也被称为微线程, 是一种比线程更轻量级的存在。简单来说, 协程是一种用户态的轻量级线程, 协程的调度完全由用户控制。
协程 vs 线程 vs 进程
| 特性 | 进程 | 线程 | 协程 |
|---|---|---|---|
| 切换开销 | 最大 | 中等 | 最小 |
| 内存占用 | 最大 | 中等 | 最小 |
| 并发能力 | 多进程并行 | 多线程并发 | 单线程内并发 |
| 数据共享 | IPC复杂 | 共享内存, 需要锁 | 无需锁 |
| 创建数量 | 少量 | 中量 | 大量 |
协程的核心优势:
- 极低的上下文切换成本
- 单线程即可实现高并发
- 避免了多线程的锁竞争问题
- 代码结构清晰, 逻辑同步
协程的基本概念
生成器 (Generator)
在Python 3.5之前, 协程是通过生成器实现的。生成器是一种特殊的迭代器, 可以在执行过程中暂停和恢复。
def simple_generator():
print("开始执行")
yield 1
print("继续执行")
yield 2
print("结束执行")
# 创建生成器
gen = simple_generator()
# 手动迭代
print(next(gen)) # 输出: 开始执行, 1
print(next(gen)) # 输出: 继续执行, 2
print(next(gen)) # StopIteration
yield 关键字
yield类似于return, 但不会结束函数- 每次调用
next()时, 生成器执行到yield并返回值 - 下次调用
next()时, 从上次的yield继续执行
def countdown(n):
print("开始倒计时")
while n > 0:
yield n
n -= 1
for i in countdown(5):
print(i)
# 输出:
# 开始倒计时
# 5
# 4
# 3
# 2
# 1
send() 方法
生成器的 send() 方法可以向生成器内部发送数据:
def accumulator():
total = 0
while True:
value = yield total
if value is not None:
total += value
acc = accumulator()
next(acc) # 启动生成器 (预激活)
print(acc.send(10)) # 10
print(acc.send(20)) # 30
print(acc.send(30)) # 60
async/await 语法
Python 3.5 引入了 async 和 await 关键字, 大大简化了协程的编写。
async def 定义协程
import asyncio
async def hello():
print("Hello")
await asyncio.sleep(1)
print("World")
# 运行协程
asyncio.run(hello())
# 输出:
# Hello
# (等待1秒)
# World
await 暂停协程
await 用于等待一个可等待对象 (如协程、Task、Future), 在等待期间, 事件循环可以执行其他协程。
import asyncio
import time
async def count():
print("One")
await asyncio.sleep(1)
print("Two")
async def main():
await asyncio.gather(count(), count(), count())
start = time.time()
asyncio.run(main())
end = time.time()
print(f"总耗时: {end - start:.2f}秒")
# 输出:
# One
# One
# One
# Two
# Two
# Two
# 总耗时: 1.00秒
注意: 如果使用同步的 time.sleep(), 则会是3秒!
asyncio 模块详解
asyncio 是Python的标准库, 提供了完整的异步编程支持。
事件循环 (Event Loop)
事件循环是 asyncio 的核心, 负责调度和执行所有协程。
import asyncio
async def task(name, delay):
print(f"任务 {name} 开始")
await asyncio.sleep(delay)
print(f"任务 {name} 完成")
return f"{name}的结果"
async def main():
# 获取当前事件循环
loop = asyncio.get_running_loop()
print(f"事件循环: {loop}")
# 创建多个任务
task1 = asyncio.create_task(task("A", 1))
task2 = asyncio.create_task(task("B", 2))
task3 = asyncio.create_task(task("C", 1.5))
# 等待所有任务完成
results = await asyncio.gather(task1, task2, task3)
print(f"结果: {results}")
asyncio.run(main())
创建任务 (Tasks)
import asyncio
async def fetch_data(id):
await asyncio.sleep(id)
return f"数据{id}"
async def main():
# 方式1: 使用 asyncio.create_task
task1 = asyncio.create_task(fetch_data(1))
task2 = asyncio.create_task(fetch_data(2))
# 等待任务完成
result1 = await task1
result2 = await task2
print(result1, result2)
# 方式2: 使用 asyncio.gather
tasks = [fetch_data(i) for i in range(3, 6)]
results = await asyncio.gather(*tasks)
print(results)
asyncio.run(main())
超时控制
import asyncio
async def long_operation():
print("开始长时间操作...")
await asyncio.sleep(5)
print("操作完成")
return "结果"
async def main():
try:
# 设置超时时间为2秒
result = await asyncio.wait_for(long_operation(), timeout=2.0)
print(result)
except asyncio.TimeoutError:
print("操作超时!")
asyncio.run(main())
并发与并行
import asyncio
async def worker(name, seconds):
print(f"{name} 开始工作")
await asyncio.sleep(seconds)
print(f"{name} 完成工作")
return f"{name}耗时{seconds}秒"
async def main():
print("=== 并发执行 ===")
start = asyncio.get_event_loop().time()
# 并发执行多个协程
results = await asyncio.gather(
worker("任务1", 2),
worker("任务2", 3),
worker("任务3", 1)
)
end = asyncio.get_event_loop().time()
print(f"总耗时: {end - start:.2f}秒")
print(f"结果: {results}")
asyncio.run(main())
实际应用示例
异步HTTP请求
import asyncio
import aiohttp
async def fetch_url(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
urls = [
"https://www.example.com",
"https://www.python.org",
"https://www.github.com"
]
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
for url, content in zip(urls, results):
print(f"{url}: {len(content)} 字符")
asyncio.run(main())
异步文件操作
import asyncio
import aiofiles
async def read_file(filename):
async with aiofiles.open(filename, mode='r') as f:
return await f.read()
async def write_file(filename, content):
async with aiofiles.open(filename, mode='w') as f:
await f.write(content)
async def main():
# 写入文件
await write_file("test.txt", "Hello, asyncio!")
# 读取文件
content = await read_file("test.txt")
print(content)
asyncio.run(main())
异步数据库操作
import asyncio
import aiomysql
async def query_database():
conn = await aiomysql.connect(
host='127.0.0.1',
port=3306,
user='root',
password='password',
db='test'
)
try:
async with conn.cursor() as cur:
await cur.execute("SELECT * FROM users")
result = await cur.fetchall()
print(result)
finally:
conn.close()
asyncio.run(query_database())
协程的高级用法
协程链
import asyncio
async def compute(x, y):
print(f"计算 {x} + {y}")
await asyncio.sleep(1)
return x + y
async def main():
# 协程链: 一个协程等待另一个协程的结果
result1 = await compute(1, 2)
result2 = await compute(result1, 3)
result3 = await compute(result2, 4)
print(f"最终结果: {result3}")
asyncio.run(main())
异步上下文管理器
import asyncio
class AsyncContextManager:
async def __aenter__(self):
print("进入上下文")
await asyncio.sleep(0.5)
return self
async def __aexit__(self, exc_type, exc, tb):
print("退出上下文")
await asyncio.sleep(0.5)
async def do_something(self):
print("执行操作")
async def main():
async with AsyncContextManager() as manager:
await manager.do_something()
asyncio.run(main())
异步迭代器
import asyncio
class AsyncIterator:
def __init__(self, n):
self.n = n
self.current = 0
def __aiter__(self):
return self
async def __anext__(self):
if self.current >= self.n:
raise StopAsyncIteration
await asyncio.sleep(0.5)
self.current += 1
return self.current
async def main():
async for i in AsyncIterator(5):
print(f"迭代: {i}")
asyncio.run(main())
信号量 (Semaphore)
信号量用于限制并发数量:
import asyncio
async def worker(semaphore, worker_id):
async with semaphore:
print(f"Worker {worker_id} 获取信号量")
await asyncio.sleep(2)
print(f"Worker {worker_id} 释放信号量")
async def main():
# 限制最多3个并发
semaphore = asyncio.Semaphore(3)
# 创建10个任务
tasks = [worker(semaphore, i) for i in range(10)]
await asyncio.gather(*tasks)
asyncio.run(main())
异步编程最佳实践
1. 不要在协程中使用阻塞操作
# ❌ 错误: 阻塞整个事件循环
import time
async def bad_example():
time.sleep(5) # 同步阻塞!
# ✅ 正确: 使用异步版本
import asyncio
async def good_example():
await asyncio.sleep(5) # 非阻塞
2. 合理使用 gather 和 wait
import asyncio
async def task(name, delay):
await asyncio.sleep(delay)
print(f"{name} 完成")
async def main():
# gather: 等待所有任务完成
await asyncio.gather(
task("A", 1),
task("B", 2),
task("C", 3)
)
# wait: 更精细的控制
done, pending = await asyncio.wait([
task("X", 1),
task("Y", 2),
task("Z", 3)
], timeout=2.5)
print(f"完成的任务: {len(done)}")
print(f"未完成的任务: {len(pending)}")
# 取消未完成的任务
for task in pending:
task.cancel()
asyncio.run(main())
3. 错误处理
import asyncio
async def might_fail(should_fail=False):
await asyncio.sleep(1)
if should_fail:
raise ValueError("任务失败")
return "成功"
async def main():
# 方式1: try-except
try:
result = await might_fail(should_fail=True)
print(result)
except ValueError as e:
print(f"捕获到错误: {e}")
# 方式2: return_exceptions
results = await asyncio.gather(
might_fail(should_fail=False),
might_fail(should_fail=True),
might_fail(should_fail=False),
return_exceptions=True
)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"任务{i+1} 失败: {result}")
else:
print(f"任务{i+1} 成功: {result}")
asyncio.run(main())
4. 避免过度使用协程
协程不是万能的, 在以下情况不需要使用协程:
- CPU密集型任务 (使用多进程)
- 简单的顺序任务
- 不涉及IO的操作
# ❌ 不适合: CPU密集型
async def cpu_intensive():
result = sum(i**2 for i in range(10000000)) # 阻塞CPU
return result
# ✅ 适合: IO密集型
async def io_intensive():
async with aiohttp.ClientSession() as session:
async with session.get('https://api.example.com') as response:
return await response.json()
性能对比
让我们对比一下同步和异步的性能差异:
import asyncio
import time
# 同步版本
def sync_fetch(url):
time.sleep(1) # 模拟IO等待
return f"同步获取 {url}"
def sync_main():
urls = [f"url{i}" for i in range(10)]
start = time.time()
results = [sync_fetch(url) for url in urls]
end = time.time()
print(f"同步总耗时: {end - start:.2f}秒")
return results
# 异步版本
async def async_fetch(url):
await asyncio.sleep(1) # 模拟IO等待
return f"异步获取 {url}"
async def async_main():
urls = [f"url{i}" for i in range(10)]
start = time.time()
tasks = [async_fetch(url) for url in urls]
results = await asyncio.gather(*tasks)
end = time.time()
print(f"异步总耗时: {end - start:.2f}秒")
return results
# 执行对比
print("=== 同步执行 ===")
sync_main()
print("\n=== 异步执行 ===")
asyncio.run(async_main())
输出:
=== 同步执行 ===
同步总耗时: 10.00秒
=== 异步执行 ===
异步总耗时: 1.00秒
可以看到, 在IO密集型任务中, 异步编程可以显著提升性能!
总结
协程的核心要点
- 协程是轻量级的: 切换成本远低于线程
- 单线程并发: 通过事件循环调度多个协程
- 非阻塞IO: 使用
async/await实现异步操作 - 适合IO密集型: 网络请求、文件操作等
- 不适合CPU密集型: 这类任务应该用多进程
学习路径建议
- 基础: 理解生成器和 yield
- 入门: 学习 async/await 语法
- 进阶: 掌握 asyncio 模块
- 实战: 使用 aiohttp、aiofiles 等异步库
- 深入: 学习事件循环原理和性能优化
常用异步库
asyncio: 标准库异步框架aiohttp: 异步HTTP客户端/服务器aiofiles: 异步文件操作aiomysql: 异步MySQL客户端aioredis: 异步Redis客户端asyncpg: 异步PostgreSQL客户端
异步编程是现代Python开发的重要技能, 特别在Web开发、爬虫、微服务等领域有着广泛的应用。掌握协程和asyncio, 可以让你的代码更加高效、优雅!