## 概要
Flow 提供丰富的操作符用于控制速率与时效。本文通过 `buffer`、`debounce`、`sample`、`collectLatest` 和 `flatMapLatest` 等演示在 UI 与服务端场景下的可验证背压策略与折衷。
## 环境校验
- Kotlin/JVM: 1.9+(或 2.0+)
- `kotlinx-coroutines-core`: 1.8.x+
Gradle:
dependencies {
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.8.1")
}
## buffer 提升吞吐
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun fast(): Flow<Int> = flow {
repeat(10) { i -> emit(i); delay(10) }
}
fun main() = runBlocking {
fast().buffer(64).collect { delay(50); println("buffer: $it") }
}
要点:允许积压以提升吞吐,但会占用内存;对时效性要求不高的批处理适用。
## debounce 输入防抖
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun typing(events: List<String>): Flow<String> = flow {
for (e in events) { emit(e); delay(30) }
}
fun main() = runBlocking {
typing(listOf("k","ko","kot","kotl","kotlin"))
.debounce(100)
.collect { println("search: $it") }
}
要点:只有在暂停超过阈值后才发射最后一项,适合搜索框输入。
## sample 周期采样
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun burst(): Flow<Int> = flow {
repeat(20) { i -> emit(i); delay(20) }
}
fun main() = runBlocking {
burst().sample(100).collect { println("sample: $it") }
}
要点:固定周期取样,避免处理过于频繁的事件序列。
## collectLatest 与 flatMapLatest 取最新
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun queries(): Flow<String> = flow {
val q = listOf("a","ab","abc","abcd")
for (s in q) { emit(s); delay(50) }
}
suspend fun search(q: String): String { delay(120); return "result($q)" }
fun main() = runBlocking {
// collectLatest:处理新值时取消旧处理
queries().collectLatest { q ->
println("start $q")
val r = search(q)
println("done $q -> $r")
}
// flatMapLatest:映射为内部流并仅收集最新
queries().flatMapLatest { q -> flow { emit(search(q)) } }
.collect { println("latest: $it") }
}
要点:强调时效性,适合 UI 请求或增量搜索;旧任务会被取消以节省资源。
## StateFlow 建模最新状态
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
class VM(scope: CoroutineScope) {
private val _q = MutableStateFlow("")
val q: StateFlow<String> = _q
val results: StateFlow<String> = q
.debounce(100)
.flatMapLatest { s -> flow { emit("res($s)") } }
.stateIn(scope, SharingStarted.Eagerly, "")
fun type(s: String) { _q.value = s }
}
fun main() = runBlocking {
val vm = VM(this)
val job = launch { vm.results.collect { println(it) } }
listOf("k","ko","kot","kotlin").forEach { vm.type(it); delay(30) }
delay(300)
job.cancel()
}
## 选择策略建议
- 追求吞吐:`buffer`
- 追求时效:`collectLatest`/`flatMapLatest`/`conflate`
- 用户输入:`debounce`;频繁事件:`sample`
## 结论
- 背压与时间操作符需按场景选择,过度缓冲或过度丢弃都会影响体验与资源利用。
- `StateFlow` 适合表示最新状态并在多个订阅者间共享。
## 参考
- Kotlin Flow 官方文档与操作符说明

发表评论 取消回复