## 为什么需要正确的并发原语

  • I/O 密集型(网络/磁盘)使用异步或线程池最优;CPU 密集型使用进程池规避 GIL。
  • 结构化并发(TaskGroup)让协程的生命周期可控:一个子任务失败会按规则取消/传播,避免“僵尸任务”。

## 适用版本与前置要求

  • `asyncio.TaskGroup`:Python 3.11+(PEP 654,结构化并发)。
  • `asyncio.to_thread`:Python 3.9+,在事件循环内把阻塞函数丢给线程执行。
  • `concurrent.futures.ThreadPoolExecutor/ProcessPoolExecutor`:Python 标准库,3.2+稳定可用。
  • 支持 Windows/Linux/macOS;示例全部使用标准库,便于验证与落地。

## 场景划分与选型

  • I/O 密集(网络请求、文件读写):优先 `asyncio`;阻塞库用 `to_thread` 包装;批量阻塞可用 `ThreadPoolExecutor`。
  • CPU 密集(压缩、图像处理、加密、解析):优先 `ProcessPoolExecutor`,多进程并行规避 GIL。
  • 控制协程生命周期:批量异步任务建议用 `TaskGroup`,比裸 `create_task` 更安全。

## asyncio.TaskGroup:结构化并发实战(Python 3.11+)

下面示例演示并发执行多个异步任务、结果汇总与异常传播行为:


import asyncio

async def io_job(i: int) -> int:
    # 模拟 I/O 等待
    await asyncio.sleep(0.2)
    return i * 2

async def main():
    results = []
    tasks = []
    try:
        async with asyncio.TaskGroup() as tg:
            for i in range(5):
                # create_task 返回 Task,可在退出 TaskGroup 后读取 result()
                t = tg.create_task(io_job(i))
                tasks.append(t)
    except* Exception as eg:  # Python 3.11 的异常分组语法(PEP 654)
        # 任一子任务抛错会取消兄弟任务并把异常打包成 ExceptionGroup
        print("异常组:", eg)
        raise

    for t in tasks:
        results.append(t.result())
    print("并发结果:", results)

if __name__ == "__main__":
    asyncio.run(main())

要点:

  • `async with TaskGroup()` 保证作用域内任务要么全部成功,要么整体按异常语义清理并传播。
  • `except*` 捕获异常组(多个子任务可能同时失败)。
  • 退出 `TaskGroup` 后读取每个 `Task.result()` 汇总结果。

## 处理阻塞 I/O:asyncio.to_thread(Python 3.9+)

当你必须调用阻塞函数(如使用不支持异步的库),用 `to_thread` 把它丢给线程避免卡死事件循环:


import asyncio
import time

def blocking_read(path: str) -> str:
    time.sleep(0.3)  # 模拟阻塞
    with open(path, "r", encoding="utf-8") as f:
        return f.read()

async def main():
    # 在事件循环内,把阻塞函数放到后台线程执行
    content = await asyncio.to_thread(blocking_read, "README.md")
    print("读取长度:", len(content))

if __name__ == "__main__":
    asyncio.run(main())

要点:

  • `to_thread` 保持协程语义,避免手工管理 `run_in_executor`;适合偶发、轻量的阻塞调用。

## 线程池与进程池:批量并行

对于大量阻塞任务或 CPU 密集任务,使用 `concurrent.futures` 更合适:


from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time

def cpu_heavy(n: int) -> int:
    # 纯 Python 计算示例(CPU 密集)
    s = 0
    for i in range(n):
        s += i * i
    return s

def io_heavy(i: int) -> int:
    time.sleep(0.2)  # 模拟阻塞 I/O
    return i

if __name__ == "__main__":
    # I/O:线程池
    with ThreadPoolExecutor(max_workers=8) as tp:
        io_results = list(tp.map(io_heavy, range(10)))
    print("线程池 I/O 结果:", io_results)

    # CPU:进程池(规避 GIL)
    with ProcessPoolExecutor() as pp:
        cpu_results = list(pp.map(cpu_heavy, [100_0000] * 4))
    print("进程池 CPU 结果(部分):", cpu_results[:2])

要点:

  • 线程池适合 I/O 阻塞;进程池适合 CPU 密集,避免 GIL 限制。
  • `map` 批量提交任务;`max_workers` 需要结合机器核心数与任务特性调优。

## 在 asyncio 中使用线程/进程池

如果整体架构是异步,但内部需要批量阻塞或计算,可在事件循环中桥接到执行器:


import asyncio
from concurrent.futures import ThreadPoolExecutor

def blocking_io(i: int) -> int:
    import time
    time.sleep(0.2)
    return i * 10

async def main():
    loop = asyncio.get_running_loop()
    with ThreadPoolExecutor(max_workers=4) as tp:
        # 提交多个阻塞任务到线程池,并在协程中等待
        futures = [loop.run_in_executor(tp, blocking_io, i) for i in range(6)]
        results = await asyncio.gather(*futures)
        print("桥接结果:", results)

if __name__ == "__main__":
    asyncio.run(main())

要点:

  • `run_in_executor` 用于与已有线程池集成;适合批量阻塞任务且希望复用线程池。

## 异常与取消:保证任务群体一致性

`TaskGroup` 在一个任务失败时会取消其他兄弟任务,确保作用域内要么全部成功,要么整体失败:


import asyncio

async def ok():
    await asyncio.sleep(0.1)
    return "OK"

async def boom():
    await asyncio.sleep(0.05)
    raise RuntimeError("boom")

async def main():
    try:
        async with asyncio.TaskGroup() as tg:
            t1 = tg.create_task(ok())
            t2 = tg.create_task(boom())
            t3 = tg.create_task(ok())
    except* Exception as eg:
        print("捕获异常组:", eg)

    # t1/t3 很可能已被取消
    print("t1 已取消:", t1.cancelled())
    print("t3 已取消:", t3.cancelled())

if __name__ == "__main__":
    asyncio.run(main())

实践建议:

  • 在 `TaskGroup` 作用域外进行结果汇总与错误分类处理。
  • 把可取消的资源操作(如网络、文件)设计为幂等或可重试,配合外部超时控制。

## 性能与调优注意事项

  • `max_workers` 并非越大越好:I/O 任务受限于服务端/网络;CPU 任务受限于核心数与任务粒度。
  • 线程上下文切换开销存在;少量阻塞优先 `to_thread`,批量阻塞再考虑线程池。
  • 进程池有序列化开销;计算任务需足够重才能体现收益。
  • 使用 `asyncio.wait_for` 为长任务设置超时,避免资源泄漏。

## 结论与落地

  • I/O 优先异步;阻塞用 `to_thread` 或线程池;CPU 用进程池。
  • 批量协程用 `TaskGroup` 提升健壮性与可维护性,异常按组传播、兄弟任务自动取消。
  • 所有示例均基于标准库,跨平台可运行,便于工程落地与验证。


点赞(0) 打赏

评论列表 共有 0 条评论

暂无评论
立即
投稿

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部