协程 (Coroutine), 是一种比线程更轻量级的存在。它是 单线程里的并发,靠的是 协作式调度,不是操作系统强行切。
为什么需要协程?
想象一个典型场景:
请求 A:等数据库
请求 B:等网络
请求 C:等磁盘
如果你用 同步代码:
handle(A) # 卡住
handle(B)
handle(C)
CPU 大量时间在 等 I/O,很浪费。
协程的思想是: 当我遇到 I/O 等待时,我主动让出 CPU,让别人先跑。
协程 vs 线程 vs 进程
| 特性 | 进程 | 线程 | 协程 |
|---|---|---|---|
| 切换开销 | 最大 | 中等 | 最小 |
| 内存占用 | 最大 | 中等 | 最小 |
| 并发能力 | 多进程并行 | 多线程并发 | 单线程内并发 |
| 数据共享 | IPC复杂 | 共享内存, 需要锁 | 无需锁 |
| 创建数量 | 少量 | 中量 | 大量 |
入门
使用 async/await 语法声明的 协程 是编写 asyncio应用程序 的首选方式。
术语 “协程” 可用于表示两个概念:
一个协程函数:一个async def函数;
一个协程对象:调用 协程函数后返回的对象。
asyncio.run
例如,以下代码片段会打印“hello”,等待1秒,然后打印“world”:
import asyncio
async def main():
print('hello')
await asyncio.sleep(1)
print('world')
asyncio.run(main())
注意,简单地调用协程并不会安排它执行:
>>> main()
<coroutine object main at 0x1053bb7c8>
await
要实际运行协程,asyncio 提供了以下机制:
asyncio.run()函数用于运行顶级入口点“main()”函数(上面已经演示过)await一个协程。以下代码片段将在等待1秒后打印“hello”,然后在再等待2秒后打印“world”
import asyncio
import time
async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)
async def main():
print(f"started at {time.strftime('%X')}")
await say_after(1, 'hello')
await say_after(2, 'world')
print(f"finished at {time.strftime('%X')}")
asyncio.run(main())
时间花费 3 秒, 输出:
started at 17:13:52
hello
world
finished at 17:13:55
asyncio.create_task
asyncio.create_task() 函数用于将协程作为 asyncio Tasks 并发运行。
并发运行两个 say_after 协程:
async def main():
task1 = asyncio.create_task(
say_after(1, 'hello'))
task2 = asyncio.create_task(
say_after(2, 'world'))
print(f"started at {time.strftime('%X')}")
# Wait until both tasks are completed (should take around 2 seconds.)
await task1
await task2
print(f"finished at {time.strftime('%X')}")
时间花费 2 秒(比之前快 1 秒), 输出:
started at 17:14:32
hello
world
finished at 17:14:34
asyncio.TaskGroup
asyncio.TaskGroup 类提供了一种比create_task()更现代的替代方案。使用此API,最后一个示例变为:
async def main():
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(
say_after(1, 'hello'))
task2 = tg.create_task(
say_after(2, 'world'))
print(f"started at {time.strftime('%X')}")
# The await is implicit when the context manager exits.
print(f"finished at {time.strftime('%X')}")
时间花费也是 2 秒。
asyncio 模块详解
asyncio 是Python的标准库, 提供了完整的异步编程支持。
可等待对象
如果一个对象可以在 await 表达式中使用,那么它就是一个可等待对象。许多 asyncio API 被设计为接受 可等待对象(awaitable object)。
可等待对象主要有三种类型:
- coroutine(协程)
- Task(任务)
- Future
协程 是可等待对象,因此可以从其他协程中等待:
import asyncio
async def nested():
return 42
async def main():
# Nothing happens if we just call "nested()".
# A coroutine object is created but not awaited,
# so it *won't run at all*.
nested() # will raise a "RuntimeWarning".
# Let's do it differently now and await it:
print(await nested()) # will print "42".
asyncio.run(main())
任务 用于 并发地调度 协程。
当 coroutine 通过 asyncio.create_task() 之类的函数包装成 task 时,该 coroutine 会被自动安排尽快运行:
import asyncio
async def nested():
return 42
async def main():
# Schedule nested() to run soon concurrently
# with "main()".
task = asyncio.create_task(nested())
# "task" can now be used to cancel "nested()", or
# can simply be awaited to wait until it is complete:
await task
asyncio.run(main())
Future 是一种 更底层的(low-level) 可等待对象,它表示异步操作的 最终结果。
asyncio中的Future对象用于允许基于回调的代码与async/await一起使用。
通常情况下,在 应用级代码中 没有必要创建 Future 对象。
事件循环 (Event Loop)
事件循环是 asyncio 的核心, 负责调度和执行所有协程。
import asyncio
async def task(name, delay):
print(f"任务 {name} 开始")
await asyncio.sleep(delay)
print(f"任务 {name} 完成")
return f"{name}的结果"
async def main():
# 获取当前事件循环
loop = asyncio.get_running_loop()
print(f"事件循环: {loop}")
# 创建多个任务
task1 = asyncio.create_task(task("A", 1))
task2 = asyncio.create_task(task("B", 2))
task3 = asyncio.create_task(task("C", 1.5))
# 等待所有任务完成
results = await asyncio.gather(task1, task2, task3)
print(f"结果: {results}")
asyncio.run(main())
gather 和 as_completed
asyncio.gather(*aws) 并发运行 可等待对象序列(aws, awaitable objects) 中的内容。
如果 aws 中的任何可等待对象是 协程,它会被自动调度为一个 任务。
import asyncio
async def fetch_data(id):
await asyncio.sleep(id)
return f"数据{id}"
async def main():
# 方式1: 使用 asyncio.create_task
task1 = asyncio.create_task(fetch_data(1))
task2 = asyncio.create_task(fetch_data(2))
# 等待任务完成
result1 = await task1
result2 = await task2
print(result1, result2)
# 方式2: 使用 asyncio.gather
tasks = [fetch_data(i) for i in range(3, 6)]
results = await asyncio.gather(*tasks)
print(results)
asyncio.run(main())
asyncio.as_completed(aws) 会返回一个迭代器,按 完成顺序 产出协程结果,而不是提交顺序。as_completed(aws) 中的 aws 是 可等待对象列表。
普通 gather(按提交顺序返回):
import asyncio
async def task(name, delay):
await asyncio.sleep(delay)
return f"{name} done"
async def main():
results = await asyncio.gather(
task("A", 2),
task("B", 1),
task("C", 3),
)
print(results)
asyncio.run(main())
# 输出(固定顺序):
# ['A done', 'B done', 'C done']
使用 as_completed(按完成顺序处理):
import asyncio
async def task(name, delay):
await asyncio.sleep(delay)
return f"{name} done"
async def main():
tasks = [
task("A", 2),
task("B", 1),
task("C", 3),
]
for coro in asyncio.as_completed(tasks):
result = await coro
print(result)
asyncio.run(main())
超时控制
import asyncio
async def long_operation():
print("开始长时间操作...")
await asyncio.sleep(5)
print("操作完成")
return "结果"
async def main():
try:
# 设置超时时间为2秒
result = await asyncio.wait_for(long_operation(), timeout=2.0)
print(result)
except asyncio.TimeoutError:
print("操作超时!")
asyncio.run(main())
除了 wait_for, 还有 timeout_at 和 wait 也是类似功能.
asyncio.timeout(delay) 返回一个异步上下文管理器,它可用于限制等待某事物所花费的时间。
async def main():
try:
async with asyncio.timeout(10):
await long_running_task()
except TimeoutError:
print("The long operation timed out, but we've handled it.")
print("This statement will run regardless.")
实际应用示例
异步HTTP请求
import asyncio
import aiohttp
async def fetch_url(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
urls = [
"https://www.example.com",
"https://www.python.org",
"https://www.github.com"
]
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
for url, content in zip(urls, results):
print(f"{url}: {len(content)} 字符")
asyncio.run(main())
异步文件操作
import asyncio
import aiofiles
async def read_file(filename):
async with aiofiles.open(filename, mode='r') as f:
return await f.read()
async def write_file(filename, content):
async with aiofiles.open(filename, mode='w') as f:
await f.write(content)
async def main():
# 写入文件
await write_file("test.txt", "Hello, asyncio!")
# 读取文件
content = await read_file("test.txt")
print(content)
asyncio.run(main())
异步数据库操作
import asyncio
import aiomysql
async def query_database():
conn = await aiomysql.connect(
host='127.0.0.1',
port=3306,
user='root',
password='password',
db='test'
)
try:
async with conn.cursor() as cur:
await cur.execute("SELECT * FROM users")
result = await cur.fetchall()
print(result)
finally:
conn.close()
asyncio.run(query_database())
异步编程最佳实践
1. 不要在协程中使用阻塞操作
# ❌ 错误: 阻塞整个事件循环
import time
async def bad_example():
time.sleep(5) # 同步阻塞!
# ✅ 正确: 使用异步版本
import asyncio
async def good_example():
await asyncio.sleep(5) # 非阻塞
2. 合理使用 gather 和 wait
import asyncio
async def task(name, delay):
await asyncio.sleep(delay)
print(f"{name} 完成")
async def main():
# gather: 等待所有任务完成
await asyncio.gather(
task("A", 1),
task("B", 2),
task("C", 3)
)
# wait: 更精细的控制
done, pending = await asyncio.wait([
task("X", 1),
task("Y", 2),
task("Z", 3)
], timeout=2.5)
print(f"完成的任务: {len(done)}")
print(f"未完成的任务: {len(pending)}")
# 取消未完成的任务
for task in pending:
task.cancel()
asyncio.run(main())
3. 错误处理
import asyncio
async def might_fail(should_fail=False):
await asyncio.sleep(1)
if should_fail:
raise ValueError("任务失败")
return "成功"
async def main():
# 方式1: try-except
try:
result = await might_fail(should_fail=True)
print(result)
except ValueError as e:
print(f"捕获到错误: {e}")
# 方式2: return_exceptions
results = await asyncio.gather(
might_fail(should_fail=False),
might_fail(should_fail=True),
might_fail(should_fail=False),
return_exceptions=True
)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"任务{i+1} 失败: {result}")
else:
print(f"任务{i+1} 成功: {result}")
asyncio.run(main())
4. 避免过度使用协程
协程不是万能的, 在以下情况不需要使用协程:
- CPU密集型任务 (使用多进程)
- 简单的顺序任务
- 不涉及IO的操作
# ❌ 不适合: CPU密集型
async def cpu_intensive():
result = sum(i**2 for i in range(10000000)) # 阻塞CPU
return result
# ✅ 适合: IO密集型
async def io_intensive():
async with aiohttp.ClientSession() as session:
async with session.get('https://api.example.com') as response:
return await response.json()