一、前言
Rxjava是用Java实现的ReactiveX框架开源库,ReactiveX就是大名鼎鼎的响应式编程。而响应式编程和观察者模式又是紧紧相关联的。
二、观察者模式
1)基本原理
观察者模式时对象的行为模式,又叫发布-订阅模式,即让多个观察者对象同时监听某一个主题对象。这个主题对象在状态上发生变化时,会通知所有观察者对象,是它们能够自动更新自己。
2)jdk实现版本
在JDK中已有对观察者模式的封装:Observable.java/Observer.java。在JDK的实现版本中,Obserbable是被观察者,有一个Vector数组来存放观察者,当被观察者事件改变时,调用notifyObservers来通知所有的观察者,每个观察者(Observer)调用自己的update来做相应的更新操作,在这里面我们可以思考两个地方:
1、观察者数组为什么使用Vector而不是List?
由于观察者数组必须考虑到线程安全,所以JDK版本中在对Vector进行操作的时候,都会加上synchronized并在通知Observer的时候反向遍历数组,以此保证线程安全。而使用List在反向遍历会更加复杂。
2、被观察者通知每个观察者时是否需要加上try-catch?
加上try-catch虽然可以保证每个观察者都能做更新操作,但是在这里设置的话观察者的异常将抛给被观察者(被观察者表示无辜躺枪,毕竟是你的异常,你自己不处理反而叫我处理?)。所以try-catch应该由观察者的update操作自己实现。
三、响应式编程
1)特点:
- 观察者模式时Obserable-Observer,如果我们要在观察者做操作前对数据做一些其他处理怎么办?观察者模式无法解耦,只能在observer的update操作中实现。而响应式编程是Observable-Operator1-OperatorN-Observer,可以很好的解耦控制数据流操作。
- 响应式编程通过链式调用,让用户在代码流程上能够更加清晰的掌控(即异步操作)。
- 观察者模式无法处理观察者的异常,需要用户自己加try-catch结构。而响应式编程提供了另一种方案,用户不需要使用try-catch只需实现错误处理方法就可以做到。
- 观察者模式若需要使用多线程,需要用户在observer实现多线程操作,也就是将观察者和任务调度糅杂在一起。而响应式编程对观察者和任务调度解耦,可以通过Schedulers来处理线程调度的问题。
- 观察者模式中观察者无法处理自己的调用超时问题,而响应式编程则可以设定观察者的调度超时机制。
- 响应式编程提供阻塞式(BlockingObservable)和非阻塞式Observable的调用
四、线程控制——Scheduler
在不指定线程的情况下,RxJava遵循的是线程不变的原则,即:在哪个线程调用subscribe(),就在哪个线程生产事件,也就同样在哪个线程消费事件。如果需要切换线程,就需要用到Scheduler(调度器)。
1)Scheduler的API
在RxJava中,Shceduler相当于线程控制器,RxJava通过它来指定每段代码应该运行在什么样的线程。主要API有:
- Schedulers.immediate():直接在当前线程运行,相当于不指定线程。这是默认的Scheduler。
- Schedulers.newThread():启用新线程,并在新线程执行操作。
- Schedulers.io():I/O操作(读写文件、读写数据库、网络信息交互等)所使用的Scheduler。
- Schedulers.computation():CPU密集型计算所使用的Scheduler。
- AndroidSchedulers.mainThread():在Android主线程运行,即UI线程。
在RxJava中,我们通过对Observable使用subscribeOn()和observeOn()两个方法来对线程进行控制:
- subscribeOn():指定Observable.OnSubscribe被激活时所处的线程,也叫做事件产生线程。
- observeOn():指定Subscriber所运行的线程,也叫事件消费线程。
故
|
|
我们一般将这两个过程设置为“后台线程取数据,主线程显示”的程序策略。
2)Scheduler的原理
subscribeOn()和observerOn()的内部实现,也是通过lift(),也就是算这个过程中也会产生新的Observable和新的Subscriber,整条链式调用的过程和使用map一样。以observerOn()为例:
|
|
主要看 lift(new OperatorObserveOn
|
|
这里传入了一个Scheduler,也就是我们在最外层指定的运行线程,在其内部有一个ObserveOnSubscriber这个类,在它创建时会根据scheduler生成一个Scheduler.Worker,看看其构造方法:
|
|
主要是这句:recursiveScheduler = scheduler.createWorker();
再看看它主要实现的三个回调方法:
|
|
三个回调都指向了schedule()这个方法,在schedule()中我们通过recursiveScheduler实现了线程的调度。由此这个lift()返回的新Observable就运行在这个我们指定的线程中了,而后面再订阅这个Observable的也都运行在这个线程了。
五、总结
通过map和observeOn两个方法,我们可以看到都是调用lift传入Operator然后返回一个新的Observable。Operator是Observable中定义的接口,即用户的逻辑操作,RxJava框架调用lift方法将Operator包装成为Observable,在Observable调用其自身的onSubscriber时即执行Operator的Call方法,也就完成用户的具体逻辑操作,包括将数据转型(new Func()复写实现方法)和线程调度。
如此通过一系列的Observable实现用户具体的Operator,再传入下一个Observable,直到最终传到我们Observer观察者身上,完成整个流水式操作。
再提一下,如果RxJava和Retrofit一起使用的话,RxJava中最原始的Observable是由RxJavaCallAdapterFactory创建的,而创建时传入的onSubscriber的主要工作就是发起OkHttp.Call的网络请求。