RxJava2源码分析

本文将按照如下3个部分组织,分别是:

  • basics
  • operators
  • schedulers

本文不会很具体沿调用链每个API都一行一行分析,主要了解其原理和设计。

Basics

所谓Basics,基本上都是官方文档上的一些不太好理解或者容易忽略的concept。之所以重复这些concept,是因为我发现很多人依然对RxJava不太了解,一提起RxJava,本能就说是函数式编程。其实Rxjava正确来说是响应式编程(reactive stream)在java上的实现。响应式编程和函数式编程其实是两个东西,所谓响应式,是观察者模式,迭代器模式和函数式编程的结合。而函数式编程,在Java8上的实现其实和RxJava看上去十分相似,简单来说就是stream API在不改变原始数据状态的情况下的链式调用,每一步链式调用的参数都是functional method。Rxjava2的Flowable API和上述十分相似,但是不同的是Rxjava的Flowable链式调用最终需要通过subscribe才会执行,而subscribe就是响应式中观察者模式的体现。

Backpressure

Backpressure简单来说就是生产者消费者问题,当上游emit数据的速度大于下游consume的速度的时候,必然会存在缓存区溢出的可能,如果不进行处理就直接MissingBackpressureException崩了。RxJava2引入了Flowable,和Observable的区别在于前者提供了onBackpressureXXX方法支持Backpressure,后者non-Backpressure。例如:

  • onBackpressureBuffer() 解决backPressure最常用的思路就是减少上游发送数据的速率,onBackpressureBuffer()就是再引入一个缓冲区,存储溢出的items。可传入capacity,不传入是unbounded。
  • onBackpressureDrop()将多余的items丢弃。可看作capacity为0,OverflowStrategy为ON_OVERFLOW_DROP_LATEST时候的onBackpressureBuffer()
  • onBackpressureLatest()用最新的item代替旧的undelivered item。可看作capacity为1,OverflowStrategy为ON_OVERFLOW_DROP_OLDEST时候的onBackpressureBuffer()

Hot Observable vs Cold Observable

Hot Observable 发数据不依赖有没有被订阅,有可能一创建就发。Cold Observable需要被订阅才会发数据。通常Observable都是cold的,可通过publish方法将cold observable转化为hot observable。Hot Observable例如ConnectableFlowable和Subject。例如输入事件,其实就是Hot Observable,Hot Observable的observer有可能是收不到所有data的。

Assembly time

observable后面接的一系列操作符在subscribe之前不会产生实际的影响,即不会执行操作符里的变换,原因后面通过源码分析。

subscribeOn和observeOn

subscribeOn决定了subscribe(注意不是subscribeOn)上游的操作在哪个线程执行,如果调用多次只有第一次会生效,observeOn决定了observeOn的下游操作在哪个线程执行,可调用多次,每次都会生效。

http://reactivex.io/documentation/operators/images/schedulers.png

operators

Rxjava中的操作符我们用的很多,但是里面是怎么实现的呢?以下面这个为例:

1
2
3
4
5
Flowable.range(1,10)
.take(5)
.filter(i -> {return i%2 == 0})
.map(i -> i*i)
.subscribe(i -> print(i))

Flowable.range用于产生一个flowable source,我们姑且称之为sourceFlowable。后面依次对这个sourceFlowable进行了take和filter,map操作。这几个操作符的方法实现几乎都是一致的:参数校验,执行hook,创建并返回对象。以take为例:

1
2
3
4
5
6
public final Flowable<T> take(long count) {
if (count < 0) {
throw new IllegalArgumentException("count >= 0 required but it was " + count);
}
return RxJavaPlugins.onAssembly(new FlowableTake<T>(this, count));
}

RxJavaPlugins这个类提供了很多用于hook RxJava操作的方法,onAssembly便是用于hook操作符的。实际上啥都没做,就返回了一个FlowableTake对象,传入了sourceFlowable。filter和map类似。我们看到整个流程等价于:

1
2
// 伪代码
new FlowableMap(new FlowableFilter(new FlowableTake(sourceflowable))).subscribe(onNext)

这一步其实已经解释了为什么在调用subscribe之前,这些操作符对应的变换实际上还没执行。

接下来看subscribe做了什么触发了一系列变换。

首先subscribe(onNext)方法最终会调到:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public final Disposable subscribe(Consumer<? super T> onNext) {
return subscribe(onNext, Functions.ON_ERROR_MISSING,
Functions.EMPTY_ACTION, FlowableInternalHelper.RequestMax.INSTANCE);
}

......

public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Subscription> onSubscribe) {
ObjectHelper.requireNonNull(onNext, "onNext is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");

LambdaSubscriber<T> ls = new LambdaSubscriber<T>(onNext, onError, onComplete, onSubscribe);

subscribe(ls);

return ls;
}

@BackpressureSupport(BackpressureKind.SPECIAL)
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Subscriber<? super T> s) {
if (s instanceof FlowableSubscriber) {
subscribe((FlowableSubscriber<? super T>)s);
} else {
ObjectHelper.requireNonNull(s, "s is null");
subscribe(new StrictSubscriber<T>(s));
}
}

也就是说其实执行的是:

1
2
// 伪代码
new FlowableMap(new FlowableFilter(new FlowableTake(sourceflowable))).subscribe(new LambdaSubscriber())

而subscribe(LambdaSubscriber ls)方法调用的其实是Flowable的abstract方法:subscribeActual(ls), subscribeActual由Flowable的各个子类重写。

而FlowableMap的subscribeActual如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
public FlowableMap(Flowable<T> source, Function<? super T, ? extends U> mapper) {
super(source);
this.mapper = mapper;
}

@Override
protected void subscribeActual(Subscriber<? super U> s) {
if (s instanceof ConditionalSubscriber) {
source.subscribe(new MapConditionalSubscriber<T, U>((ConditionalSubscriber<? super U>)s, mapper));
} else {
source.subscribe(new MapSubscriber<T, U>(s, mapper));
}
}

如果我们用=>表示调用过程,那么可以知道:

1
2
3
4
5
6
// 伪代码
new FlowableMap(new FlowableFilter(new FlowableTake(sourceflowable))).subscribeActual(new LambdaSubscriber())

=> new FlowableFilter(new FlowableTake(sourceflowable)).subscribeActual(new MapSubscriber(new LambdaSubscriber(), mapper))
=> new FlowableTake(sourceflowable).subscribeActual(new FilterSubscriber(new MapSubscriber(new LambdaSubscriber(), mapper), predicate))
=> sourceflowable.subscribeActual(new TakeSubscriber(new FilterSubscriber(new MapSubscriber(new LambdaSubscriber(), mapper), predicate), limit))

我们来看sourceFlowable(FlowableRange)的subscribeActual方法:

1
2
3
4
5
6
7
8
9
@Override
public void subscribeActual(Subscriber<? super Integer> s) {
if (s instanceof ConditionalSubscriber) {
s.onSubscribe(new RangeConditionalSubscription(
(ConditionalSubscriber<? super Integer>)s, start, end));
} else {
s.onSubscribe(new RangeSubscription(s, start, end));
}
}

即:

1
2
3
4
flowableRange.subscribeActual(new TakeSubscriber(new FilterSubscriber(new MapSubscriber(new LambdaSubscriber(), mapper), predicate), limit))

=> s.onSubscribe(new RangeSubscription(s, start, end))
// 其中s代表new TakeSubscriber(new FilterSubscriber(new MapSubscriber(new LambdaSubscriber(), mapper), predicate), limit)

由于这些TakeSubscriber,FilterSubscriber,MapSubscriber的onSubscribe都是调用的downStream的onSubscribe,所以最终调用到的就是
lambdaSubscriber.onSubscribe(new RangeSubscription(s, start, end),进而调用到rangeSubscription的request方法,request中调用s的onNext,onComplete和onError。而s的onNext,onComplete和onError又逐层向内调用,依次调用到TakeSubscriber,FilterSubscriber,MapSubscriber,LambdaSubscriber的onNext,onComplete和onError。这些subscriber(除了LambdaSubscriber)的onNext中会执行对应的操作符的逻辑,最终LambdaSubscriber的onNext,onComplete和onError就是我们调用的时候传入的onNext,onComplete和onError。如此一同操作下来就实现了所谓的操作符。

整个流程有些绕。首先是每个操作符都会创建并返回一个Flowable对象,这个Flowable对象层层包裹上游的Flowable对象,其次是最终subscribe的时候每个操作符返回的Flowable对象都重写了subscribeActual方法,随着Flowable的subscribeActual层层向内调用,操作符对应的subscriber也层层包裹,层层包裹后的subscriber又层层向内调用onSubscribe,最终调到subscription的request方法,request逐层调subscriber的onNext, 完成对应的操作符变换。

到这里你也许会感受到,看起来很优雅的操作符其实内部实现也是非常的简单粗暴,就是不断的层层组合。

schedulers

subscribeOn

subscribeOn其实也是创建并返回了一个FlowableSubscribeOn对象,持有我们传入的scheduler。

1
2
3
4
5
6
7
8
9
10
11
public final Flowable<T> subscribeOn(@NonNull Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return subscribeOn(scheduler, !(this instanceof FlowableCreate));
}

......

public final Flowable<T> subscribeOn(@NonNull Scheduler scheduler, boolean requestOn) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new FlowableSubscribeOn<T>(this, scheduler, requestOn));
}

而FlowableSubscribeOn的subscribeActual方法如下:

1
2
3
4
5
6
7
8
@Override
public void subscribeActual(final Subscriber<? super T> s) {
Scheduler.Worker w = scheduler.createWorker();
final SubscribeOnSubscriber<T> sos = new SubscribeOnSubscriber<T>(s, w, source, nonScheduledRequests);
s.onSubscribe(sos);

w.schedule(sos);
}

创建了一个worker,将SubscribeOnSubscriber(实现了Runnable接口)传入schedule方法。schedule方法一通操作会调用到scheduleActual:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}

Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}

return sr;
}

可以看出就是把SubscribeOnSubscriber抛到线程池执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Override
public void run() {
lazySet(Thread.currentThread());
Publisher<T> src = source;
source = null;
src.subscribe(this);
}

@Override
public void onSubscribe(Subscription s) {
if (SubscriptionHelper.setOnce(this.upstream, s)) {
long r = requested.getAndSet(0L);
if (r != 0L) {
requestUpstream(r, s);
}
}
}

void requestUpstream(final long n, final Subscription s) {
if (nonScheduledRequests || Thread.currentThread() == get()) {
s.request(n);
} else {
worker.schedule(new Request(s, n));
}
}

从之前对操作符的源码分析部分可以知道run方法里面调上游observable的subscribe方法订阅SubscribeOnSubscriber后,会调到SubscribeOnSubscriber的onSubscribe方法,而onSubscribe里会调到上游的request方法,结合之前对操作符的源码分析知道调上游subscription的request方法会层层调用直到调到最上游的subscription的request方法,而最上游subscription的request方法会层层向下调用下游observable的onNext方法。

所以说subscribeOn不管在哪个位置调用都会影响subscribe之前的所有变换所在的线程。

observeOn

observeOn同理会返回FlowableObserveOn对象,其subscribeActual方法如下:

1
2
3
4
5
6
7
8
9
10
11
@Override
public void subscribeActual(Subscriber<? super T> s) {
Worker worker = scheduler.createWorker();

if (s instanceof ConditionalSubscriber) {
source.subscribe(new ObserveOnConditionalSubscriber<T>(
(ConditionalSubscriber<? super T>) s, worker, delayError, prefetch));
} else {
source.subscribe(new ObserveOnSubscriber<T>(s, worker, delayError, prefetch));
}
}

而ObserveOnSubscriber的onSubscribe方法会调用下游的onSubscribe方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Override
public void onSubscribe(Subscription s) {
if (SubscriptionHelper.validate(this.upstream, s)) {
this.upstream = s;

if (s instanceof QueueSubscription) {
@SuppressWarnings("unchecked")
QueueSubscription<T> f = (QueueSubscription<T>) s;

int m = f.requestFusion(ANY | BOUNDARY);

if (m == SYNC) {
sourceMode = SYNC;
queue = f;
done = true;

downstream.onSubscribe(this);
return;
} else
if (m == ASYNC) {
sourceMode = ASYNC;
queue = f;

downstream.onSubscribe(this);

s.request(prefetch);

return;
}
}

queue = new SpscArrayQueue<T>(prefetch);

downstream.onSubscribe(this);

s.request(prefetch);
}
}

下游的onSubscribe方法层层调用最终调到最下游的lambdaSubscriber的onSubscribe方法,而lambdaSubscriber的onSubscribe方法会调到subscription的request方法,进而层层调到下游的onnext方法。

理解了的读者就会问了,线程切换在哪一步呢?来看ObserveOnSubscriber对象里的onNext方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Override
public final void onNext(T t) {
if (done) {
return;
}
if (sourceMode == ASYNC) {
trySchedule();
return;
}
if (!queue.offer(t)) {
upstream.cancel();

error = new MissingBackpressureException("Queue is full?!");
done = true;
}
trySchedule();
}

可以看到不同于大部分subscriber的onNext方法,这里调用了trySchedule()切换线程再往后执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
final void trySchedule() {
if (getAndIncrement() != 0) {
return;
}
worker.schedule(this);
}

@Override
public final void run() {
if (outputFused) {
runBackfused();
} else if (sourceMode == SYNC) {
runSync();
} else {
runAsync();
}
}

abstract void runBackfused();

abstract void runSync();

abstract void runAsync();

ObserveOnSubscriber的runBackfused/runSync/runAsync方法继续调下游的onNext方法,所以说observeOn只会改变调用observeOn的地方的下游操作所在线程,并且是可以多次调用生效的。


Rxjava操作符众多,本文没有一一研究其作用,主要是了解operators和schedulers背后的原理。可以看出RxJava原理并没有很复杂,但是只有了解其原理才能知道如何结合实际场景正确使用甚至造一个轮子。参照源码的实现我写了个简陋的RxJava,可以帮助读者更好的理解本文所述原理。地址:https://github.com/bboylin/RxJavaDemo

参考: