kotlin里的flow API,和RxJava神似,大体一样的操作符和线程(flow里是协程)调度,链式API , 都是cold stream, 等等,flow可以作为kotlin中RxJava的替代,而且可能做得更好。
Flow in Kotlin
Flow是cold stream ,和reactive stream一样,它有 map, filter, take, zip等中间操作符(Intermediate operators),也有 collect, single, reduce, toList等终止操作符(Terminal operators)。flow上游调的中间操作符操作符内所做的transform只有在调到终止操作符之后才会真正开始执行,所以称作cold stream。
Flow builders
flow builder主要有一下几个:
- flowOf(…) functions to create a flow from a fixed set of values.
- asFlow() extension functions on various types to convert them into flows.
- flow { … } builder function to construct arbitrary flows from sequential calls to emit function.
- channelFlow { … } builder function to construct arbitrary flows from potentially concurrent calls to the send function.
Flow transform
flow的操作符见官方文档,和reactive stream的操作符大同小异,不一一赘述。flow的transform可以执行suspend函数,这个是和sequence的一个区别。flow中的transform接受的函数都是suspend的,包括collect在内,所以flow数据的发射是串行的,只有上一个数据被消费后下一个数据才会发射。这也就是为什么flow天然支持backpressure。RxJava不一样,RxJava处理backpressure的策略有buffer/drop/latest,处理backpressure让这些操作符内部实现变得更复杂。我们可使用buffer操作符让上游在下游消费的时候继续发射数据而不是等待。(conflate操作符相当于大小为1的buffer操作,并且只保存latest元素。combine和zip操作符都可以讲两个flow合并成一个,但是combine的话其中任意一个flow产生新值的时候都会和另一个flow最近的值一起重新计算,发射一个新数据。)
当flow执行到可被取消的suspend函数的时候可以被cancel,这也是Rxjava不具备的。
Reactive Stream在多个编程语言上都有实现,自然有一套统一的规范,用其中自带的操作符不太可能出问题,但是如果我们自己想实现一个操作符呢?这就不太友好了。flow的话想自己实现一个操作符就简单多了,不用考虑backpressure,不用大量的代码,举个例子:
1 | fun <T> Flow<T>.delayASecond() = flow { |
flow上没有onComplete和onEror callback 但是从上面也可以看出,其实我们自定义操作符也可以实现这俩的功能了,例如:
1 | fun <T> Flow<T>.onCompleted(action: () -> Unit) = flow { |
官方的实现见:onCompletion
1 | fun <T> Flow<T>.onCompletion( |
以及 catch
1 | fun <T> Flow<T>.catch( |
还记得Rxjava的subscribe会返回一个subscription吗?subscription必须被unsubscribe 否则会内存泄漏,因为subscribe接受的是匿名内部类 会持有对外部类对象的引用,然而flow里没这个问题,其collect接受的是suspend function。
Flow scheduler
flow的调度是针对协程的,RxJava针对线程。flowOn类似subscribeOn,改变上游运行的coroutineContext,不用flowOn而是用withContext等手段让上游发射和下游消费发生在不同协程的话会抛出异常。
参考: