## 概要

Flow 提供丰富的操作符用于控制速率与时效。本文通过 `buffer`、`debounce`、`sample`、`collectLatest` 和 `flatMapLatest` 等演示在 UI 与服务端场景下的可验证背压策略与折衷。


## 环境校验

  • Kotlin/JVM: 1.9+(或 2.0+)
  • `kotlinx-coroutines-core`: 1.8.x+

Gradle:

dependencies {
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.8.1")
}

## buffer 提升吞吐

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun fast(): Flow<Int> = flow {
    repeat(10) { i -> emit(i); delay(10) }
}

fun main() = runBlocking {
    fast().buffer(64).collect { delay(50); println("buffer: $it") }
}

要点:允许积压以提升吞吐,但会占用内存;对时效性要求不高的批处理适用。


## debounce 输入防抖

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun typing(events: List<String>): Flow<String> = flow {
    for (e in events) { emit(e); delay(30) }
}

fun main() = runBlocking {
    typing(listOf("k","ko","kot","kotl","kotlin"))
        .debounce(100)
        .collect { println("search: $it") }
}

要点:只有在暂停超过阈值后才发射最后一项,适合搜索框输入。


## sample 周期采样

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun burst(): Flow<Int> = flow {
    repeat(20) { i -> emit(i); delay(20) }
}

fun main() = runBlocking {
    burst().sample(100).collect { println("sample: $it") }
}

要点:固定周期取样,避免处理过于频繁的事件序列。


## collectLatest 与 flatMapLatest 取最新

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun queries(): Flow<String> = flow {
    val q = listOf("a","ab","abc","abcd")
    for (s in q) { emit(s); delay(50) }
}

suspend fun search(q: String): String { delay(120); return "result($q)" }

fun main() = runBlocking {
    // collectLatest:处理新值时取消旧处理
    queries().collectLatest { q ->
        println("start $q")
        val r = search(q)
        println("done $q -> $r")
    }

    // flatMapLatest:映射为内部流并仅收集最新
    queries().flatMapLatest { q -> flow { emit(search(q)) } }
        .collect { println("latest: $it") }
}

要点:强调时效性,适合 UI 请求或增量搜索;旧任务会被取消以节省资源。


## StateFlow 建模最新状态

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

class VM(scope: CoroutineScope) {
    private val _q = MutableStateFlow("")
    val q: StateFlow<String> = _q

    val results: StateFlow<String> = q
        .debounce(100)
        .flatMapLatest { s -> flow { emit("res($s)") } }
        .stateIn(scope, SharingStarted.Eagerly, "")

    fun type(s: String) { _q.value = s }
}

fun main() = runBlocking {
    val vm = VM(this)
    val job = launch { vm.results.collect { println(it) } }
    listOf("k","ko","kot","kotlin").forEach { vm.type(it); delay(30) }
    delay(300)
    job.cancel()
}

## 选择策略建议

  • 追求吞吐:`buffer`
  • 追求时效:`collectLatest`/`flatMapLatest`/`conflate`
  • 用户输入:`debounce`;频繁事件:`sample`

## 结论

  • 背压与时间操作符需按场景选择,过度缓冲或过度丢弃都会影响体验与资源利用。
  • `StateFlow` 适合表示最新状态并在多个订阅者间共享。

## 参考

  • Kotlin Flow 官方文档与操作符说明

点赞(0) 打赏

评论列表 共有 0 条评论

暂无评论
立即
投稿

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部