RxJava 是如何实现线程切换的(上)

论坛 期权论坛     
选择匿名的用户   2021-5-30 00:19   801   0
<div class="content" id="articleContent">
<div class="ad-wrap">
</div>
<span id="OSC_h2_1"></span>
<h2>前言</h2>
<p>通过前一篇的<a href="https://link.jianshu.com/?t&#61;https%3A%2F%2Fjuejin.im%2Fpost%2F5a521d68f265da3e4e25750e">从观察者模式出发,聊聊RxJava</a>,我们大致理解了RxJava的实现原理,在RxJava中可以非常方便的实现不同线程间的切换。subscribeOn 用于指定上游线程,observeOn 用于指定下游线程,多次用 subscribeOn 指定上游线程只有第一次有效,多次用 observeOn 指定下次线程,每次都有效;简直太方便了,比直接使用Handler省了不少力气,同时也不用去关注内存泄漏的问题了。本篇就来看看在RxJava中上游是如何实现线程切换。</p>
<span id="OSC_h2_2"></span>
<h2>RxJava 基础原理</h2>
<p>为了方便后面的叙述,这里通过下面的UML图简单回顾一下上一篇的内容。</p>
<p><em>此图并没有完整的展现图中各个接口和类之间的各种关系,因为那样会导致整个图错综复杂,不便于查看,这里只绘制出了RxJava各个类之间核心关系网络</em></p>
<p>从上面的UML图中可以看出,具体的实现类只有ObservableCreate和CreateEmitter。CreateEmitter是ObservableCreate的内部类(PlantUML 怎么绘制内部类,没搞懂,玩的转的同学请赐教呀(<em>▽</em>))。</p>
<p>上篇说过Observable创建的过程,可以简化如下:</p>
<pre class="blockcode"><code>  Observable mObservable&#61;new ObservableCreate(new ObservableOnSubscribe())
</code></pre>
<p>结合图可以更直观的体现出这一点。ObservableCreate 内部持有ObservableOnSubscribe的引用。</p>
<p>当观察者订阅主题后:</p>
<pre class="blockcode"><code>mObservable.subscribe(mObserver);
</code></pre>
<p>ObservableCreate 中的subscribeActual()方法就会执行,</p>
<pre class="blockcode"><code>    protected void subscribeActual(Observer&lt;? super T&gt; observer) {
        CreateEmitter&lt;T&gt; parent &#61; new CreateEmitter&lt;T&gt;(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

</code></pre>
<p>在这个过程中会创建CreateEmitter 的实例,而这个CreateEmitter实现了Emitter和Disposable接口,同时又持有Observer的引用(当然这个引用是ObservableCreate传递给他的)。<strong>接着就会执行ObservableOnSubscribe的subscribe 方法</strong>,方法的参数即为刚刚创建的CreateEmitter 的实例,接着一系列连锁反应,Emitter 接口中的方法(onNext,onComplete等)开始执行,在CreateEmitter内部,Observer接口中对应的方法依次执行,这样就实现了一次从主题(上游)到观察者(下游)的事件传递。</p>
<p><strong>source.subscribe(parent)</strong></p>
<p>这里的 source 是ObservableOnSubscribe的实例,parent是CreateEmitter的实例。上面加粗文本叙述的内容,就是这行代码,可以说这是整个订阅过程最核心的实现。</p>
<p>好了,回顾完基础知识后,马上进入正题,看看RxJava是如何实现线程切换的。</p>
<span id="OSC_h2_3"></span>
<h2>RxJava 之 subscribeOn</h2>
<p>我们知道正常情况下,所有的内容都是在主线程执行,既然这里提到了线程切换,那么必然是切换到了子线程,因此,这里需要关注线程的问题,我们就带着下面这几个问题去阅读代码。</p>
<ul><li>1.<strong>是哪个对象在什么时候创建了子线程,是一种怎样的方式创建的?</strong></li><li>2.<strong>子线程又是如何启动的?</strong></li><li>3.<strong>上游事件是怎么跑到子线程里执行的?</strong></li><li>4.<strong>多次用 subscribeOn 指定上游线程为什么只有第一次有效 ?</strong></li></ul>
<span id="OSC_h3_4"></span>
<h3>示例</h3>
<p>首先看一下,日常开发中实现线程切换的具体实现</p>
<pre class="blockcode"><code>
    private void multiThread() {
        Observable.create(new ObservableOnSubscribe&lt;String&gt;() {
            &#64;Override
            public void subscribe(ObservableEmitter&lt;String&gt; e) throws Exception {
                e.onNext(&#34;This msg from work thread :&#34; &#43; Thread.currentThread().getName());
                sb.append(&#34;\nsubscribe: currentThreadName&#61;&#61;&#34; &#43; Thread.currentThread().getName());
            }
        })
                .subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer&lt;String&gt;() {
                    &#64;Override
                    public void accept(String s) throws Exception {
                        Log.e(TAG, &#34;accept: s&#61; &#34; &#43; s);
                    }
                });
    }

</code></pre>
<p>这段代码,使用过RxJava的同学再
分享到 :
0 人收藏
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

积分:3875789
帖子:775174
精华:0
期权论坛 期权论坛
发布
内容

下载期权论坛手机APP