源码分析-RxJava

一、前言

Rxjava是用Java实现的ReactiveX框架开源库,ReactiveX就是大名鼎鼎的响应式编程。而响应式编程和观察者模式又是紧紧相关联的。

二、观察者模式

1)基本原理

观察者模式时对象的行为模式,又叫发布-订阅模式,即让多个观察者对象同时监听某一个主题对象。这个主题对象在状态上发生变化时,会通知所有观察者对象,是它们能够自动更新自己。

2)jdk实现版本

在JDK中已有对观察者模式的封装:Observable.java/Observer.java。在JDK的实现版本中,Obserbable是被观察者,有一个Vector数组来存放观察者,当被观察者事件改变时,调用notifyObservers来通知所有的观察者,每个观察者(Observer)调用自己的update来做相应的更新操作,在这里面我们可以思考两个地方:

1、观察者数组为什么使用Vector而不是List?

由于观察者数组必须考虑到线程安全,所以JDK版本中在对Vector进行操作的时候,都会加上synchronized并在通知Observer的时候反向遍历数组,以此保证线程安全。而使用List在反向遍历会更加复杂。

2、被观察者通知每个观察者时是否需要加上try-catch?

加上try-catch虽然可以保证每个观察者都能做更新操作,但是在这里设置的话观察者的异常将抛给被观察者(被观察者表示无辜躺枪,毕竟是你的异常,你自己不处理反而叫我处理?)。所以try-catch应该由观察者的update操作自己实现。

三、响应式编程

1)特点:
  • 观察者模式时Obserable-Observer,如果我们要在观察者做操作前对数据做一些其他处理怎么办?观察者模式无法解耦,只能在observer的update操作中实现。而响应式编程是Observable-Operator1-OperatorN-Observer,可以很好的解耦控制数据流操作。
  • 响应式编程通过链式调用,让用户在代码流程上能够更加清晰的掌控(即异步操作)。
  • 观察者模式无法处理观察者的异常,需要用户自己加try-catch结构。而响应式编程提供了另一种方案,用户不需要使用try-catch只需实现错误处理方法就可以做到。
  • 观察者模式若需要使用多线程,需要用户在observer实现多线程操作,也就是将观察者和任务调度糅杂在一起。而响应式编程对观察者和任务调度解耦,可以通过Schedulers来处理线程调度的问题。
  • 观察者模式中观察者无法处理自己的调用超时问题,而响应式编程则可以设定观察者的调度超时机制。
  • 响应式编程提供阻塞式(BlockingObservable)和非阻塞式Observable的调用

四、线程控制——Scheduler

在不指定线程的情况下,RxJava遵循的是线程不变的原则,即:在哪个线程调用subscribe(),就在哪个线程生产事件,也就同样在哪个线程消费事件。如果需要切换线程,就需要用到Scheduler(调度器)。

1)Scheduler的API

在RxJava中,Shceduler相当于线程控制器,RxJava通过它来指定每段代码应该运行在什么样的线程。主要API有:

  • Schedulers.immediate():直接在当前线程运行,相当于不指定线程。这是默认的Scheduler。
  • Schedulers.newThread():启用新线程,并在新线程执行操作。
  • Schedulers.io():I/O操作(读写文件、读写数据库、网络信息交互等)所使用的Scheduler。
  • Schedulers.computation():CPU密集型计算所使用的Scheduler。
  • AndroidSchedulers.mainThread():在Android主线程运行,即UI线程。

在RxJava中,我们通过对Observable使用subscribeOn()和observeOn()两个方法来对线程进行控制:

  • subscribeOn():指定Observable.OnSubscribe被激活时所处的线程,也叫做事件产生线程。
  • observeOn():指定Subscriber所运行的线程,也叫事件消费线程。

1
2
3
4
5
6
7
8
9
Observable.just(1,2)
.subscribeOn(Schedulers.io()) // 指定subscribe的内容发生在io线程
.observeOn(AndroidSchedulers.mainThread()) // 指定Subscribe的回调发送在主线程
.subscribe(new Action1<>() {
@Override
public void call(Integer number) {
Log.d(tag, "number:" + number);
}
});

我们一般将这两个过程设置为“后台线程取数据,主线程显示”的程序策略。

2)Scheduler的原理

subscribeOn()和observerOn()的内部实现,也是通过lift(),也就是算这个过程中也会产生新的Observable和新的Subscriber,整条链式调用的过程和使用map一样。以observerOn()为例:

1
2
3
4
5
6
public final Observable<T> observeOn(Scheduler scheduler) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return lift(new OperatorObserveOn<T>(scheduler));
}

主要看 lift(new OperatorObserveOn(scheduler)) 这句,返回了一个新的Observable,和map的一样,再看看OperatorObserveOn这个方法

1
2
3
public OperatorObserveOn(Scheduler scheduler) {
this.scheduler = scheduler;
}

这里传入了一个Scheduler,也就是我们在最外层指定的运行线程,在其内部有一个ObserveOnSubscriber这个类,在它创建时会根据scheduler生成一个Scheduler.Worker,看看其构造方法:

1
2
3
4
5
6
7
8
9
10
public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child) {
this.child = child;
this.recursiveScheduler = scheduler.createWorker();
if (UnsafeAccess.isUnsafeAvailable()) {
queue = new SpscArrayQueue<Object>(RxRingBuffer.SIZE);
} else {
queue = new SynchronizedQueue<Object>(RxRingBuffer.SIZE);
}
this.scheduledUnsubscribe = new ScheduledUnsubscribe(recursiveScheduler);
}

主要是这句:recursiveScheduler = scheduler.createWorker();

再看看它主要实现的三个回调方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Override
public void onNext(final T t) {
......// 高度省略
schedule();
}
@Override
public void onCompleted() {
......// 高度省略
schedule();
}
@Override
public void onError(final Throwable e) {
......// 高度省略
unsubscribe();
......
// polling thread should skip any onNext still in the queue
schedule();
}
protected void schedule() {
if(counter.getAndIncrement == 0) {
recursiveScheduler.schedule(action);
}
}

三个回调都指向了schedule()这个方法,在schedule()中我们通过recursiveScheduler实现了线程的调度。由此这个lift()返回的新Observable就运行在这个我们指定的线程中了,而后面再订阅这个Observable的也都运行在这个线程了。

五、总结

通过map和observeOn两个方法,我们可以看到都是调用lift传入Operator然后返回一个新的Observable。Operator是Observable中定义的接口,即用户的逻辑操作,RxJava框架调用lift方法将Operator包装成为Observable,在Observable调用其自身的onSubscriber时即执行Operator的Call方法,也就完成用户的具体逻辑操作,包括将数据转型(new Func()复写实现方法)和线程调度。

如此通过一系列的Observable实现用户具体的Operator,再传入下一个Observable,直到最终传到我们Observer观察者身上,完成整个流水式操作。

再提一下,如果RxJava和Retrofit一起使用的话,RxJava中最原始的Observable是由RxJavaCallAdapterFactory创建的,而创建时传入的onSubscriber的主要工作就是发起OkHttp.Call的网络请求。