## 概要

Kotlin 协程以结构化并发确保生命周期与错误传播可控;配合 Flow/StateFlow 能在高并发与 UI/服务端场景中实现可验证的数据流与背压策略。本文以可复制代码与测试说明 `coroutineScope`/`supervisorScope` 的差异、取消与超时、`Dispatchers` 的选择,以及 `buffer`、`conflate`、`collectLatest` 的背压权衡。


## 环境校验

  • Kotlin/JVM: 1.9+(或 2.0+,以项目 Gradle 为准)
  • `kotlinx-coroutines-core`: 1.8.x 及以上
  • (可选)`kotlinx-coroutines-test`: 1.8.x 及以上

Gradle(Kotlin DSL)示例:

dependencies {
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.8.1")
    testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.8.1")
}

## 结构化并发与错误传播

`coroutineScope` 会在任一子协程失败时取消整个作用域;`supervisorScope` 则隔离失败,其他子协程可继续运行。根据需求选择传播或隔离。


import kotlinx.coroutines.*

fun main() = runBlocking {
    println("-- coroutineScope 失败传播 --")
    try {
        coroutineScope {
            val a = launch { delay(100); println("A 完成") }
            val b = launch { throw RuntimeException("B 出错") }
            a.join(); b.join()
        }
    } catch (e: Throwable) {
        println("捕获: ${e.message}")
    }

    println("-- supervisorScope 失败隔离 --")
    supervisorScope {
        val handler = CoroutineExceptionHandler { _, e -> println("处理异常: ${e.message}") }
        val a = launch(handler) { delay(100); println("A 完成") }
        val b = launch(handler) { throw RuntimeException("B 出错") }
        a.join(); b.join()
        println("作用域继续运行")
    }
}

要点:

  • 需要“全部成功或全部失败”时选用 `coroutineScope`;需要“局部容错”时选用 `supervisorScope`。
  • 对可能抛错的子协程结合 `CoroutineExceptionHandler` 明确日志与恢复策略。

## 取消与超时

使用 `withTimeout` 或 `withTimeoutOrNull` 强制上限,避免任务悬挂;在协程体内使用 `isActive`/`ensureActive()` 响应取消。


import kotlinx.coroutines.*

suspend fun fetchWithTimeout(): String? = withTimeoutOrNull(300) {
    repeat(10) { i ->
        delay(50)
        ensureActive() // 响应取消
        println("step $i")
    }
    "OK"
}

fun main() = runBlocking {
    val r = fetchWithTimeout()
    println("结果: $r") // 可能为 null(超时)或 "OK"
}

要点:

  • 在上游设定超时,确保资源可释放;下游协程体需“可取消”。
  • 结合 `NonCancellable` 仅在必须的清理阶段使用,避免掩盖取消。

## Dispatcher 策略

  • `Dispatchers.Default`:CPU 密集;`Dispatchers.IO`:阻塞 IO;避免在 `Main`/`Default` 中进行长阻塞。
  • 使用 `withContext(Dispatchers.IO)` 包裹数据库/网络调用;在热路径保持最小切换开销。

import kotlinx.coroutines.*

suspend fun readFile(): String = withContext(Dispatchers.IO) {
    // 伪代码:真实项目中替换为阻塞 IO 操作
    Thread.sleep(50)
    "data"
}

fun main() = runBlocking {
    val t = kotlin.system.measureTimeMillis { readFile() }
    println("IO 用时: $t ms")
}

## Flow 背压与冷热流

通过 `buffer`/`conflate`/`collectLatest` 控制生产-消费速率;`stateIn`/`shareIn` 用于将冷流推广为可复用的热流。


import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun fastProducer(): Flow<Int> = flow {
    repeat(10) { i ->
        emit(i)
        delay(10) // 生产很快
    }
}

suspend fun demoBackpressure() = coroutineScope {
    println("-- buffer --")
    fastProducer()
        .buffer(capacity = 64) // 允许积压,提升吞吐
        .collect { delay(50); println("buffer 收到: $it") }

    println("-- conflate --")
    fastProducer()
        .conflate() // 跳过中间值,仅保留最新
        .collect { delay(50); println("conflate 收到: $it") }

    println("-- collectLatest --")
    fastProducer()
        .collectLatest { v ->
            println("开始处理 $v")
            delay(50) // 处理耗时
            println("完成 $v")
        }
}

fun main() = runBlocking { demoBackpressure() }

状态建模(`StateFlow`):

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

class Repo(private val scope: CoroutineScope) {
    private val _counter = MutableStateFlow(0)
    val counter: StateFlow<Int> = _counter

    fun inc() { _counter.value++ }

    val hotComputed: StateFlow<String> = flow {
        emit("init")
        _counter.collect { c -> emit("count=$c") }
    }.stateIn(scope, SharingStarted.Eagerly, "init")
}

fun main() = runBlocking {
    val repo = Repo(this)
    val job = launch { repo.hotComputed.collect { println(it) } }
    repeat(3) { repo.inc(); delay(20) }
    job.cancel()
}

要点:

  • `buffer` 提升吞吐但占用内存;`conflate`/`collectLatest` 强调时效性,适用于 UI 交互与流式结果。
  • 将冷流 `flow {}` 通过 `stateIn`/`shareIn` 转为热流,便于跨组件复用与订阅。

## 测试与验证(kotlinx-coroutines-test)

在无真实 Dispatcher 的环境中用 `runTest` 可确定性验证取消与超时逻辑。


import kotlinx.coroutines.*
import kotlinx.coroutines.test.*
import kotlin.test.*

class TimeoutTest {
    @Test
    fun `withTimeoutOrNull 返回 null 表示超时`() = runTest {
        val r = withTimeoutOrNull(100) { delay(200); "OK" }
        assertNull(r)
    }
}

## 性能建议

  • 明确目标:吞吐、尾延迟、响应性不可兼得;通过场景选择 `buffer` 或时效策略。
  • 在服务端以压测(并发、消息速率)验证背压策略;在客户端以交互延迟与抖动衡量体验。

## 结论与边界

  • 结构化并发让错误与取消行为可预期;在需要隔离时使用 `supervisorScope`。
  • 背压策略需按场景选择,过度缓冲或过度丢弃都会损伤指标。
  • 测试与度量是落地协程设计的关键:使用 `runTest` 与基准工具验证。

## 参考

  • Kotlin Coroutines 官方文档
  • kotlinx-coroutines-test 指南

点赞(0) 打赏

评论列表 共有 0 条评论

暂无评论
立即
投稿

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部