<div class="content" id="articleContent">
<div class="ad-wrap">
</div>
<span id="OSC_h2_1"></span>
<h2>前言</h2>
<p>通过前一篇的<a href="https://link.jianshu.com/?t=https%3A%2F%2Fjuejin.im%2Fpost%2F5a521d68f265da3e4e25750e">从观察者模式出发,聊聊RxJava</a>,我们大致理解了RxJava的实现原理,在RxJava中可以非常方便的实现不同线程间的切换。subscribeOn 用于指定上游线程,observeOn 用于指定下游线程,多次用 subscribeOn 指定上游线程只有第一次有效,多次用 observeOn 指定下次线程,每次都有效;简直太方便了,比直接使用Handler省了不少力气,同时也不用去关注内存泄漏的问题了。本篇就来看看在RxJava中上游是如何实现线程切换。</p>
<span id="OSC_h2_2"></span>
<h2>RxJava 基础原理</h2>
<p>为了方便后面的叙述,这里通过下面的UML图简单回顾一下上一篇的内容。</p>
<p><em>此图并没有完整的展现图中各个接口和类之间的各种关系,因为那样会导致整个图错综复杂,不便于查看,这里只绘制出了RxJava各个类之间核心关系网络</em></p>
<p>从上面的UML图中可以看出,具体的实现类只有ObservableCreate和CreateEmitter。CreateEmitter是ObservableCreate的内部类(PlantUML 怎么绘制内部类,没搞懂,玩的转的同学请赐教呀(<em>▽</em>))。</p>
<p>上篇说过Observable创建的过程,可以简化如下:</p>
<pre class="blockcode"><code> Observable mObservable=new ObservableCreate(new ObservableOnSubscribe())
</code></pre>
<p>结合图可以更直观的体现出这一点。ObservableCreate 内部持有ObservableOnSubscribe的引用。</p>
<p>当观察者订阅主题后:</p>
<pre class="blockcode"><code>mObservable.subscribe(mObserver);
</code></pre>
<p>ObservableCreate 中的subscribeActual()方法就会执行,</p>
<pre class="blockcode"><code> protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
</code></pre>
<p>在这个过程中会创建CreateEmitter 的实例,而这个CreateEmitter实现了Emitter和Disposable接口,同时又持有Observer的引用(当然这个引用是ObservableCreate传递给他的)。<strong>接着就会执行ObservableOnSubscribe的subscribe 方法</strong>,方法的参数即为刚刚创建的CreateEmitter 的实例,接着一系列连锁反应,Emitter 接口中的方法(onNext,onComplete等)开始执行,在CreateEmitter内部,Observer接口中对应的方法依次执行,这样就实现了一次从主题(上游)到观察者(下游)的事件传递。</p>
<p><strong>source.subscribe(parent)</strong></p>
<p>这里的 source 是ObservableOnSubscribe的实例,parent是CreateEmitter的实例。上面加粗文本叙述的内容,就是这行代码,可以说这是整个订阅过程最核心的实现。</p>
<p>好了,回顾完基础知识后,马上进入正题,看看RxJava是如何实现线程切换的。</p>
<span id="OSC_h2_3"></span>
<h2>RxJava 之 subscribeOn</h2>
<p>我们知道正常情况下,所有的内容都是在主线程执行,既然这里提到了线程切换,那么必然是切换到了子线程,因此,这里需要关注线程的问题,我们就带着下面这几个问题去阅读代码。</p>
<ul><li>1.<strong>是哪个对象在什么时候创建了子线程,是一种怎样的方式创建的?</strong></li><li>2.<strong>子线程又是如何启动的?</strong></li><li>3.<strong>上游事件是怎么跑到子线程里执行的?</strong></li><li>4.<strong>多次用 subscribeOn 指定上游线程为什么只有第一次有效 ?</strong></li></ul>
<span id="OSC_h3_4"></span>
<h3>示例</h3>
<p>首先看一下,日常开发中实现线程切换的具体实现</p>
<pre class="blockcode"><code>
private void multiThread() {
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("This msg from work thread :" + Thread.currentThread().getName());
sb.append("\nsubscribe: currentThreadName==" + Thread.currentThread().getName());
}
})
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG, "accept: s= " + s);
}
});
}
</code></pre>
<p>这段代码,使用过RxJava的同学再 |
|