## 为什么需要正确的并发原语
- I/O 密集型(网络/磁盘)使用异步或线程池最优;CPU 密集型使用进程池规避 GIL。
- 结构化并发(TaskGroup)让协程的生命周期可控:一个子任务失败会按规则取消/传播,避免“僵尸任务”。
## 适用版本与前置要求
- `asyncio.TaskGroup`:Python 3.11+(PEP 654,结构化并发)。
- `asyncio.to_thread`:Python 3.9+,在事件循环内把阻塞函数丢给线程执行。
- `concurrent.futures.ThreadPoolExecutor/ProcessPoolExecutor`:Python 标准库,3.2+稳定可用。
- 支持 Windows/Linux/macOS;示例全部使用标准库,便于验证与落地。
## 场景划分与选型
- I/O 密集(网络请求、文件读写):优先 `asyncio`;阻塞库用 `to_thread` 包装;批量阻塞可用 `ThreadPoolExecutor`。
- CPU 密集(压缩、图像处理、加密、解析):优先 `ProcessPoolExecutor`,多进程并行规避 GIL。
- 控制协程生命周期:批量异步任务建议用 `TaskGroup`,比裸 `create_task` 更安全。
## asyncio.TaskGroup:结构化并发实战(Python 3.11+)
下面示例演示并发执行多个异步任务、结果汇总与异常传播行为:
import asyncio
async def io_job(i: int) -> int:
# 模拟 I/O 等待
await asyncio.sleep(0.2)
return i * 2
async def main():
results = []
tasks = []
try:
async with asyncio.TaskGroup() as tg:
for i in range(5):
# create_task 返回 Task,可在退出 TaskGroup 后读取 result()
t = tg.create_task(io_job(i))
tasks.append(t)
except* Exception as eg: # Python 3.11 的异常分组语法(PEP 654)
# 任一子任务抛错会取消兄弟任务并把异常打包成 ExceptionGroup
print("异常组:", eg)
raise
for t in tasks:
results.append(t.result())
print("并发结果:", results)
if __name__ == "__main__":
asyncio.run(main())
要点:
- `async with TaskGroup()` 保证作用域内任务要么全部成功,要么整体按异常语义清理并传播。
- `except*` 捕获异常组(多个子任务可能同时失败)。
- 退出 `TaskGroup` 后读取每个 `Task.result()` 汇总结果。
## 处理阻塞 I/O:asyncio.to_thread(Python 3.9+)
当你必须调用阻塞函数(如使用不支持异步的库),用 `to_thread` 把它丢给线程避免卡死事件循环:
import asyncio
import time
def blocking_read(path: str) -> str:
time.sleep(0.3) # 模拟阻塞
with open(path, "r", encoding="utf-8") as f:
return f.read()
async def main():
# 在事件循环内,把阻塞函数放到后台线程执行
content = await asyncio.to_thread(blocking_read, "README.md")
print("读取长度:", len(content))
if __name__ == "__main__":
asyncio.run(main())
要点:
- `to_thread` 保持协程语义,避免手工管理 `run_in_executor`;适合偶发、轻量的阻塞调用。
## 线程池与进程池:批量并行
对于大量阻塞任务或 CPU 密集任务,使用 `concurrent.futures` 更合适:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time
def cpu_heavy(n: int) -> int:
# 纯 Python 计算示例(CPU 密集)
s = 0
for i in range(n):
s += i * i
return s
def io_heavy(i: int) -> int:
time.sleep(0.2) # 模拟阻塞 I/O
return i
if __name__ == "__main__":
# I/O:线程池
with ThreadPoolExecutor(max_workers=8) as tp:
io_results = list(tp.map(io_heavy, range(10)))
print("线程池 I/O 结果:", io_results)
# CPU:进程池(规避 GIL)
with ProcessPoolExecutor() as pp:
cpu_results = list(pp.map(cpu_heavy, [100_0000] * 4))
print("进程池 CPU 结果(部分):", cpu_results[:2])
要点:
- 线程池适合 I/O 阻塞;进程池适合 CPU 密集,避免 GIL 限制。
- `map` 批量提交任务;`max_workers` 需要结合机器核心数与任务特性调优。
## 在 asyncio 中使用线程/进程池
如果整体架构是异步,但内部需要批量阻塞或计算,可在事件循环中桥接到执行器:
import asyncio
from concurrent.futures import ThreadPoolExecutor
def blocking_io(i: int) -> int:
import time
time.sleep(0.2)
return i * 10
async def main():
loop = asyncio.get_running_loop()
with ThreadPoolExecutor(max_workers=4) as tp:
# 提交多个阻塞任务到线程池,并在协程中等待
futures = [loop.run_in_executor(tp, blocking_io, i) for i in range(6)]
results = await asyncio.gather(*futures)
print("桥接结果:", results)
if __name__ == "__main__":
asyncio.run(main())
要点:
- `run_in_executor` 用于与已有线程池集成;适合批量阻塞任务且希望复用线程池。
## 异常与取消:保证任务群体一致性
`TaskGroup` 在一个任务失败时会取消其他兄弟任务,确保作用域内要么全部成功,要么整体失败:
import asyncio
async def ok():
await asyncio.sleep(0.1)
return "OK"
async def boom():
await asyncio.sleep(0.05)
raise RuntimeError("boom")
async def main():
try:
async with asyncio.TaskGroup() as tg:
t1 = tg.create_task(ok())
t2 = tg.create_task(boom())
t3 = tg.create_task(ok())
except* Exception as eg:
print("捕获异常组:", eg)
# t1/t3 很可能已被取消
print("t1 已取消:", t1.cancelled())
print("t3 已取消:", t3.cancelled())
if __name__ == "__main__":
asyncio.run(main())
实践建议:
- 在 `TaskGroup` 作用域外进行结果汇总与错误分类处理。
- 把可取消的资源操作(如网络、文件)设计为幂等或可重试,配合外部超时控制。
## 性能与调优注意事项
- `max_workers` 并非越大越好:I/O 任务受限于服务端/网络;CPU 任务受限于核心数与任务粒度。
- 线程上下文切换开销存在;少量阻塞优先 `to_thread`,批量阻塞再考虑线程池。
- 进程池有序列化开销;计算任务需足够重才能体现收益。
- 使用 `asyncio.wait_for` 为长任务设置超时,避免资源泄漏。
## 结论与落地
- I/O 优先异步;阻塞用 `to_thread` 或线程池;CPU 用进程池。
- 批量协程用 `TaskGroup` 提升健壮性与可维护性,异常按组传播、兄弟任务自动取消。
- 所有示例均基于标准库,跨平台可运行,便于工程落地与验证。

发表评论 取消回复