RxJava之subscribeOn解惑
有一天,我在使用RxJava和Retrofit实现Android上面的网络请求。突然,遇到了一个坑,在过了这些坑之后得到一些经验,觉得需要和大家分享一二。
创新互联是一家集网站建设,安泽企业网站建设,安泽品牌网站建设,网站定制,安泽网站建设报价,网络营销,网络优化,安泽网站推广为一体的创新建站企业,帮助传统企业提升企业形象加强企业竞争力。可充分满足这一群体相比中小企业更为丰富、高端、多元的互联网需求。同时我们时刻保持专业、时尚、前沿,时刻以成就客户成长自我,坚持不断学习、思考、沉淀、净化自己,让我们为更多的企业打造出实用型网站。引子用Retrofit搭配RxJava的朋友应该都知道,一般实现代码最终大都长得一幅下面的样子。
public interface BeanApi { @GET("bean/{id}") Observable
上面的代码形式相信各位都写得很熟了,但是我实在烦透了每个api调用的地方都写一次subscribOn,observeOn。然后,我找到一篇不错的文章——Don\'t break the chain: use RxJava\'s compose() operator,里面提到了一个方法避免这种重复代码(其实作者本意是要体现“不要打破链式操作”,而非避免重复代码)。最后改进到的代码就长下面的样子了。
// This is a common method.
改进后的代码比原来的代码少了一行。但是写多几次之后,我还是烦透了这个applySchedulers()。于是我疯了,就自己实现一个Retrofit的CallAdapter.Factory,让Retrotfit在每次调用api的时候自动就给我封装好subscribeOn和observeOn这些重复的代码,具体实现可以参考我的另外一篇文章——通过委派模式包装一个RxJavaCallAdapterFactory。最后,我的代码就是长下面这个样子了。
api.getBean(1) .subscribe( bean -> { // do something on main thread. }, throwable -> { // do something on main thread. } );
所有的subscribeOn和observeOn不用再写了。因为每次调用api.getBean(1),Retrotfit就调用我自定义的CallAdapter.Factory把结果封装成Observable对象的时候就已经把subscribeOn和observeOn添加上去了。
问题好,用得很爽。但是问题问题比办法多,所以问题来了。有几个特殊的地方我需要网络加载和结果监听都在当前线程。相信理解了上面代码的朋友都已经看出来了,现在我通过api.getBean(1)获取到的Observable
// isAsync is a boolean variable indicate whether the request is a asynchronous or not. api.getBean(1) .subscribeOn(isAsync ? Schedulers.io() : Schedulers.immediate()) .observeOn(isAsync ? AndroidSchedulers.mainThread() : Schedulers.immediate()) .subscribe( bean -> { // do something on main thread. }, throwable -> { // do something on main thread. } );
上面的代码再结合我之前写的CallAdapter.Factory,其实就是相当于没有自定义CallAdapter.Factory之前显式调用两次subscribeOn和两次observeOn,就像下面的样子。
api.getBean(1) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribeOn(isAsync ? Schedulers.io() : Schedulers.immediate()) .observeOn(isAsync ? AndroidSchedulers.mainThread() : Schedulers.immediate()) .subscribe( bean -> { // do something on main thread. }, throwable -> { // do something on main thread. } ); 前凑
作为一名RxJava的标准菜鸟,我被验证了自己的确很菜,我天真的认为后面的subscribeOn和observeOn会覆盖之前的Scheduler,我理所当然的认为,当isAsync为true的时候,这次api的调用就会在当前线程执行网络访问和结果处理。于是,我被搞疯了。我就看了subscribeOn的源码,如下。
public final Observable
nest的代码如下,
public final Observable
意思就是新建一个Observable,并且只会向订阅者发送一个元素——原来api.getBean(1)获得的Observale
在看lift操作之前,我们稍微回顾一下Observable的创建方法,
final OnSubscribe
其他的什么from、just等创建方法最后都是把数据转化为一个OnSubscribe对象再通过上面的create方法创建。所以我们只关注这个create方法。上面代码的意思很简单,就是new一个Observable对象,并且设置onSubscribe。所以这里的关键是onSubscribe这个对象。这里我管它做数据源,即Observable对象会用它来产生数据,并且发布给订阅者。
看到这里,我们可以发现,Observable其实没有什么,只有两个关键点:1、装载着一个onSubscribe对象,2、有订阅者注册时,就调用用这个onSubscribe的call(Subscriber)方法。
这里我们要看一下这个call(Subscriber)方法。该方法接受一个参数Subscriber,即订阅者。当有订阅者注册到Observable对象时,Observable对象就调用onSubscribe的这个call方法,并且把当前当前注册的订阅者作为参数传递过去。所以在call方法的实现中就可以调用订阅者的onNext方法来发布数据或者做其他事(不一定是发布数据)。
lift操作说明先把lift操作的代码贴出来。
public final
从上面代码第2行的new Observable
在上面Observable结构一节中,我们知道每个Observable都持有一个onSubscribe对象作为数据源。通过lift方法派生所得的Observable也不例外,也有一个,就是上面代码第二行new OnSubscribe
小结:到这里为止,要记住很重要的一点,通过lift操作产生的派生Observable对象的数据源(onSubscribe)是不实际产生数据的,它做的事就只是把自己的订阅者包装成为一个父级Observable认可的订阅者,然后委派给父级的数据源。
lift操作实例分析那么被包装出来的这个订阅者是怎么处理父级数据源发布的数据呢?这里就要回到上面代码的第6行。那里通过一个我们调用lift操作时传进去的Operator把派生Observable认可的Subscriber包装成一个父级Observable认可的Subscriber。
下面我看一个lift操作的例子,用lift模拟了两次map操作。
代码视图1:
class Bean { int value; Bean(int v) { this.value = v; } } Observable.just(new Bean(1), new Bean(2), new Bean(3), new Bean(4)) .lift(new Observable.Operator
上面的代码中Observable的链式操作其实是等价于下面代码形式的,
代码视图2:
Observable
从代码视图2中,我们可以发现这一连串的操作下来,一共产生了3个Observable对象:o1、o2、o3。之前我们说过每个Observable对象都会持有一个onSubscribe对象作为数据源,用来向订阅者发布数据。我们以不同的标识来区分一下上面三个Observable对象对应的onSubscribe对象:o1 => onSubscribe1, o2 => onSubscribe2, o3 => onSubscribe3。
从上面lift操作说明一节,我们知道lift操作产生的Observable对象的数据源是不产生数据的,它做的事就只是把自己的订阅者包装成为一个父级Observable认可的订阅者,然后委派给父级的数据源。o3是一个通过lift操作产生的派生Observable,当订阅者subscriber3注册到o3时,o3的数据源onSubscribe3就会把subscriber3包装成一个父级(o2)可以的订阅者(这里命名为subscriber2),然后委派给父级数据源(onSubscribe2)。 现在请回头看代码视图1的第34-49行。这一段代码就显示了把subscriber3包装成为subscriber2的过程。可以发现,被包装出来的subscriber2只做一件事,就是把onSubscribe2发布给自己的数据转化为subscriber3可以消化的数据,然后就交给subscriber3,相当于充当了一个subscriber3和onSubscribe2之间的桥梁。
接着分析,onSubscribe2虽然说,通过subscriber2间接把数据发布到了subscriber3。但是实际上,作为数据源,它的持有者o2,也是通过lift操作产生的派生Observable,所以这个onSubscribe2也是不直接产生数据的。它也是把自己的订阅者(subscriber2)包装一个父级(o1)认可的订阅者(这里命名为subscriber1),然后委派给父级数据源(onSubscribe1)。 现在请再回头看代码视图1的第13-28行。这段代码显示了如何把subscriber2包装成为subscriber1的过程。同样,subscriber1也只做一件事,就是把onSubscribe1发布给自己的数据转化为subscriber2可以消化的数据,然后就交给subscriber2,相当于充当了一个subscriber2和onSubscribe1之间的桥梁。
最后,整个过程可以描述为下面的一个示意图,
subscribeOn与liftRxJava中,lift操作几乎贯穿了整个Observable,因为差不多所有所有的操作符都是通过lift操作来实现的。比如map操作符,其实就是通过lift操作产生的派生Observable而已。所以这个派生Observable的数据源也就如上面我所述,自己不产生数据,而是把自己的订阅者包装成一个父级认可的订阅者。怎么包装呢?上面的讲述中,这个包装过程其实是通过我们调用lift操作时传递的参数Operator来完成的。
我们再回顾subscribeOn操作符的源码。首先,通过nest操作产生一个Observable
public class OperatorSubscribeOn
从上面的分析中,我们知道subscribeOn操作其实是先通过nest操作产生一个Observable
小结,经过subscribeOn操作产生了一个派生Observable
代码视图3
Observable.just(new Bean(1), new Bean(2), new Bean(3), new Bean(4)) .subscribeOn(Schedulers.io()) .subscribeOn(Schedulers.immediate()) .subscribe(bean -> System.out.println(bean.value));
看过subscribeOn的源码之后,我们应该知道上面的代码几乎等价于下面的写法,
代码视图4
Observable
上面代码的执行过程,可以表示成如下示意图,
通过上面示意图可以看出,最后整个整个订阅过程的运行线程是 currentThread -> immediateThread -> ioThread。
声名By 啪嗒科技 AtanL(atanl@padakeji.com)
©啪嗒科技版本所有,没有经作者允许,只能发表于padakeji.com相关域名下。
本文名称:RxJava之subscribeOn解惑
文章网址:http://pwwzsj.com/article/cjjdoo.html