## 概要
Kotlin 协程以结构化并发确保生命周期与错误传播可控;配合 Flow/StateFlow 能在高并发与 UI/服务端场景中实现可验证的数据流与背压策略。本文以可复制代码与测试说明 `coroutineScope`/`supervisorScope` 的差异、取消与超时、`Dispatchers` 的选择,以及 `buffer`、`conflate`、`collectLatest` 的背压权衡。
## 环境校验
- Kotlin/JVM: 1.9+(或 2.0+,以项目 Gradle 为准)
- `kotlinx-coroutines-core`: 1.8.x 及以上
- (可选)`kotlinx-coroutines-test`: 1.8.x 及以上
Gradle(Kotlin DSL)示例:
dependencies {
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.8.1")
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.8.1")
}
## 结构化并发与错误传播
`coroutineScope` 会在任一子协程失败时取消整个作用域;`supervisorScope` 则隔离失败,其他子协程可继续运行。根据需求选择传播或隔离。
import kotlinx.coroutines.*
fun main() = runBlocking {
println("-- coroutineScope 失败传播 --")
try {
coroutineScope {
val a = launch { delay(100); println("A 完成") }
val b = launch { throw RuntimeException("B 出错") }
a.join(); b.join()
}
} catch (e: Throwable) {
println("捕获: ${e.message}")
}
println("-- supervisorScope 失败隔离 --")
supervisorScope {
val handler = CoroutineExceptionHandler { _, e -> println("处理异常: ${e.message}") }
val a = launch(handler) { delay(100); println("A 完成") }
val b = launch(handler) { throw RuntimeException("B 出错") }
a.join(); b.join()
println("作用域继续运行")
}
}
要点:
- 需要“全部成功或全部失败”时选用 `coroutineScope`;需要“局部容错”时选用 `supervisorScope`。
- 对可能抛错的子协程结合 `CoroutineExceptionHandler` 明确日志与恢复策略。
## 取消与超时
使用 `withTimeout` 或 `withTimeoutOrNull` 强制上限,避免任务悬挂;在协程体内使用 `isActive`/`ensureActive()` 响应取消。
import kotlinx.coroutines.*
suspend fun fetchWithTimeout(): String? = withTimeoutOrNull(300) {
repeat(10) { i ->
delay(50)
ensureActive() // 响应取消
println("step $i")
}
"OK"
}
fun main() = runBlocking {
val r = fetchWithTimeout()
println("结果: $r") // 可能为 null(超时)或 "OK"
}
要点:
- 在上游设定超时,确保资源可释放;下游协程体需“可取消”。
- 结合 `NonCancellable` 仅在必须的清理阶段使用,避免掩盖取消。
## Dispatcher 策略
- `Dispatchers.Default`:CPU 密集;`Dispatchers.IO`:阻塞 IO;避免在 `Main`/`Default` 中进行长阻塞。
- 使用 `withContext(Dispatchers.IO)` 包裹数据库/网络调用;在热路径保持最小切换开销。
import kotlinx.coroutines.*
suspend fun readFile(): String = withContext(Dispatchers.IO) {
// 伪代码:真实项目中替换为阻塞 IO 操作
Thread.sleep(50)
"data"
}
fun main() = runBlocking {
val t = kotlin.system.measureTimeMillis { readFile() }
println("IO 用时: $t ms")
}
## Flow 背压与冷热流
通过 `buffer`/`conflate`/`collectLatest` 控制生产-消费速率;`stateIn`/`shareIn` 用于将冷流推广为可复用的热流。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun fastProducer(): Flow<Int> = flow {
repeat(10) { i ->
emit(i)
delay(10) // 生产很快
}
}
suspend fun demoBackpressure() = coroutineScope {
println("-- buffer --")
fastProducer()
.buffer(capacity = 64) // 允许积压,提升吞吐
.collect { delay(50); println("buffer 收到: $it") }
println("-- conflate --")
fastProducer()
.conflate() // 跳过中间值,仅保留最新
.collect { delay(50); println("conflate 收到: $it") }
println("-- collectLatest --")
fastProducer()
.collectLatest { v ->
println("开始处理 $v")
delay(50) // 处理耗时
println("完成 $v")
}
}
fun main() = runBlocking { demoBackpressure() }
状态建模(`StateFlow`):
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
class Repo(private val scope: CoroutineScope) {
private val _counter = MutableStateFlow(0)
val counter: StateFlow<Int> = _counter
fun inc() { _counter.value++ }
val hotComputed: StateFlow<String> = flow {
emit("init")
_counter.collect { c -> emit("count=$c") }
}.stateIn(scope, SharingStarted.Eagerly, "init")
}
fun main() = runBlocking {
val repo = Repo(this)
val job = launch { repo.hotComputed.collect { println(it) } }
repeat(3) { repo.inc(); delay(20) }
job.cancel()
}
要点:
- `buffer` 提升吞吐但占用内存;`conflate`/`collectLatest` 强调时效性,适用于 UI 交互与流式结果。
- 将冷流 `flow {}` 通过 `stateIn`/`shareIn` 转为热流,便于跨组件复用与订阅。
## 测试与验证(kotlinx-coroutines-test)
在无真实 Dispatcher 的环境中用 `runTest` 可确定性验证取消与超时逻辑。
import kotlinx.coroutines.*
import kotlinx.coroutines.test.*
import kotlin.test.*
class TimeoutTest {
@Test
fun `withTimeoutOrNull 返回 null 表示超时`() = runTest {
val r = withTimeoutOrNull(100) { delay(200); "OK" }
assertNull(r)
}
}
## 性能建议
- 明确目标:吞吐、尾延迟、响应性不可兼得;通过场景选择 `buffer` 或时效策略。
- 在服务端以压测(并发、消息速率)验证背压策略;在客户端以交互延迟与抖动衡量体验。
## 结论与边界
- 结构化并发让错误与取消行为可预期;在需要隔离时使用 `supervisorScope`。
- 背压策略需按场景选择,过度缓冲或过度丢弃都会损伤指标。
- 测试与度量是落地协程设计的关键:使用 `runTest` 与基准工具验证。
## 参考
- Kotlin Coroutines 官方文档
- kotlinx-coroutines-test 指南

发表评论 取消回复