RxJava is no more essential

kotlin里的flow API,和RxJava神似,大体一样的操作符和线程(flow里是协程)调度,链式API , 都是cold stream, 等等,flow可以作为kotlin中RxJava的替代,而且可能做得更好。

Flow in Kotlin

Flow是cold stream ,和reactive stream一样,它有 map, filter, take, zip等中间操作符(Intermediate operators),也有 collect, single, reduce, toList等终止操作符(Terminal operators)。flow上游调的中间操作符操作符内所做的transform只有在调到终止操作符之后才会真正开始执行,所以称作cold stream。

Flow builders

flow builder主要有一下几个:

  • flowOf(…) functions to create a flow from a fixed set of values.
  • asFlow() extension functions on various types to convert them into flows.
  • flow { … } builder function to construct arbitrary flows from sequential calls to emit function.
  • channelFlow { … } builder function to construct arbitrary flows from potentially concurrent calls to the send function.

Flow transform

flow的操作符见官方文档,和reactive stream的操作符大同小异,不一一赘述。flow的transform可以执行suspend函数,这个是和sequence的一个区别。flow中的transform接受的函数都是suspend的,包括collect在内,所以flow数据的发射是串行的,只有上一个数据被消费后下一个数据才会发射。这也就是为什么flow天然支持backpressure。RxJava不一样,RxJava处理backpressure的策略有buffer/drop/latest,处理backpressure让这些操作符内部实现变得更复杂。我们可使用buffer操作符让上游在下游消费的时候继续发射数据而不是等待。(conflate操作符相当于大小为1的buffer操作,并且只保存latest元素。combine和zip操作符都可以讲两个flow合并成一个,但是combine的话其中任意一个flow产生新值的时候都会和另一个flow最近的值一起重新计算,发射一个新数据。)

当flow执行到可被取消的suspend函数的时候可以被cancel,这也是Rxjava不具备的。

Reactive Stream在多个编程语言上都有实现,自然有一套统一的规范,用其中自带的操作符不太可能出问题,但是如果我们自己想实现一个操作符呢?这就不太友好了。flow的话想自己实现一个操作符就简单多了,不用考虑backpressure,不用大量的代码,举个例子:

1
2
3
4
5
6
fun <T> Flow<T>.delayASecond() = flow {
collect { value -> // collect from the original flow
delay(1000) // delay 1 second
emit(value) // emit value to the resulting flow
}
}

flow上没有onComplete和onEror callback 但是从上面也可以看出,其实我们自定义操作符也可以实现这俩的功能了,例如:

1
2
3
4
5
6
fun <T> Flow<T>.onCompleted(action: () -> Unit) = flow {
// reemit all values from the original flow
collect { value -> emit(value) }
// this code runs only after the normal completion
action()
}

官方的实现见:onCompletion

1
2
3
@ExperimentalCoroutinesApi fun <T> Flow<T>.onCompletion(
action: suspend FlowCollector<T>.(cause: Throwable?) -> Unit
): Flow<T> (source)

以及 catch

1
2
3
@ExperimentalCoroutinesApi fun <T> Flow<T>.catch(
action: suspend FlowCollector<T>.(cause: Throwable) -> Unit
): Flow<T> (source)

还记得Rxjava的subscribe会返回一个subscription吗?subscription必须被unsubscribe 否则会内存泄漏,因为subscribe接受的是匿名内部类 会持有对外部类对象的引用,然而flow里没这个问题,其collect接受的是suspend function。

Flow scheduler

flow的调度是针对协程的,RxJava针对线程。flowOn类似subscribeOn,改变上游运行的coroutineContext,不用flowOn而是用withContext等手段让上游发射和下游消费发生在不同协程的话会抛出异常。


参考: