RxJava2源码解析

论坛 期权论坛 脚本     
匿名技术用户   2021-1-5 23:16   11   0

本篇文章的目的:
①了解RxJava的基本流程
②了解RxJava中线程调度的实现
③了解了上面那些,其他的操作符对你来说就不是问题了

RxJava基本流程

我们从基本的使用作为入口:

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("hey");
                e.onComplete();
            }
        }).observeOn(AndroidSchedulers.mainThread())
          .subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe() : d = " + d );
            }

            @Override
            public void onNext(String value) {
                Log.d(TAG, "onNext() : value = " + value );
            } 

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError() : e = " + e );
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete()");
            }
        });

同时,结合下面这张图食用效果更佳哦!!

Observable.create()

我们以create方法作为入口,它接受的参数是一个ObservableOnSubscribe对象,ObservableOnSubscribe是一个接口,里面只有一个subscribe方法我们需要实现:

public interface ObservableOnSubscribe<T> {
    void subscribe(@NonNull ObservableEmitter<T> e) throws Exception;
}

create方法接受一个ObservableOnSubscribe对象,并包装成一个ObservableCreate对象,并调用RxJavaPlugins.onAssembly方法:

    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }

这里调用RxJavaPlugins.onAssembly方法,里面调用相关的hook方法,这里不详细讲,我们只要知道它返回了原对象。

    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }

也就是说create方法返回了一个包装了我们ObservableOnSubscribe对象的ObservableCreate对象,它继承自Observable

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }
}

该类的构造函数只是做了 source 参数的保存,该 source是我们调用create 方法时传入的 ObservableOnSubscribe,也就是说,用ObservableCreate对ObservableOnSubscribe进行包装(即装饰者模式),这里也是第一次包装,接下来我们看observeOn方法:

observeOn()

public final Observable<T> observeOn(Scheduler scheduler) {
    return observeOn(scheduler, false, bufferSize());
}

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError) {
    return observeOn(scheduler, delayError, bufferSize());
}

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
    

可以看到,这里的 observeOn(Scheduler) 最终调用三个参数的 observeOn(Scheduler,boolean,int) ,
三个参数的 observeOn方法里面把我们当前包装好的 Observable 对象(this) 和新传入的 Scheduler 对象 再进行一层包装,得到ObservableObserveOn 对象,它也是继承自Observable的

//ObservableObserveOn类
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }
}

//AbstractObservableWithUpstream类
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {

    AbstractObservableWithUpstream(ObservableSource<T> source) {
        this.source = source;
    }
    ......
}

到这里,我们做个小结:

首先create方法把我们传入的 ObservableOnSubscribe 包装成一个 ObservableCreate(继承自 Observable);
然后调用上面的 ObservableCreate 的 observeOn 方法,observeOn方法里面把我们当前包装好的 ObservableCreate 对象和新传入的 Scheduler 对象 再进行一层包装,得到ObservableObserveOn 对象,它也是继承自Observable的

接下来就是subscribe(Observer)方法了

subscribe()

.subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe() : d = " + d );
            }

            @Override
            public void onNext(String value) {
                Log.d(TAG, "onNext() : value = " + value );
            } 
           ...
        });

我们看一下observable的subscribe方法:

public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            ...
        }
    }

RxJavaPlugins.onSubscribe同样是hook方法,最后调用了自己的subscribeActual(Observer),也就是说:subscribeActual方法是在我们完成订阅 即调用subscribe(Observer)的时候 被调用的,参数就是从下游传递上来的Observer对象。
从上面的分析我们知道,我们当前的对象是ObservableObserveOn 对象,那么我们直接看ObservableObserveOn 的subscribeActual方法:

    /**
    * ObservableObserveOn 的subscribeActual方法
    */ 
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }

这里的 scheduler 对象就是我们之前传入的 AndroidSchedulers.mainThread() 方法得到的 Scheduler对象

    private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
            new Callable<Scheduler>() {
                @Override public Scheduler call() throws Exception {
                    return MainHolder.DEFAULT;
                }
            });

    public static Scheduler mainThread() {
        return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
    }

MAIN_THREAD 是个静态的对象,它的初始化是在 RxAndroidPlugins.initMainThreadScheduler() 方法中,且传入一个 call 回调,我们看看在 RxAndroidPlugins.initMainThreadScheduler() 方法中它是怎么生成的,

在下面的方法中,initMainThreadScheduler 方法里面最终会调用 callRequireNonNull 方法,callRequireNonNull方法会回调到call 方法

    public static Scheduler initMainThreadScheduler(Callable<Scheduler> scheduler) {
        if (scheduler == null) {
            throw new NullPointerException("scheduler == null");
        }
        Function<Callable<Scheduler>, Scheduler> f = onInitMainThreadHandler;
        if (f == null) {
            return callRequireNonNull(scheduler);
        }
        return applyRequireNonNull(f, scheduler);
    }

    static Scheduler callRequireNonNull(Callable<Scheduler> s) {
        try {
            Scheduler scheduler = s.call();
            if (scheduler == null) {
                throw new NullPointerException("Scheduler Callable returned null");
            }
            return scheduler;
        } catch (Throwable ex) {
            throw Exceptions.propagate(ex);
        }
    }

我们看看 call 方法

    private static final class MainHolder {

        static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
    }

    private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
            new Callable<Scheduler>() {
                @Override public Scheduler call() throws Exception {
                    return MainHolder.DEFAULT;
                }
            });

call 方法会返回 MainHolder 类的静态属性 DEFAULT,即 HandlerScheduler 对象,HandlerScheduler 构造函数传入了和我们主线程的 Looper 绑定的 Handler,这里也建立了我们在主线程执行观察者方法的基础

final class HandlerScheduler extends Scheduler {
    private final Handler handler;

    HandlerScheduler(Handler handler) {
        this.handler = handler;
    }
    .......
}

好,我们回到 ObservableObserveOn 的subscribeActual方法,现在我们知道 scheduler 就是 HandlerScheduler 了,接下来调用它得 createWorker 方法,

    /**
    * ObservableObserveOn 的subscribeActual方法
    */ 
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }

HandlerScheduler 的 createWorker 方法返回 它的 静态内部类 HandlerWorker

    @Override
    public Worker createWorker() {
        return new HandlerWorker(handler);
    }

然后下一步就很关键了,我们知道 ObservableObserveOn 对象包装了我们的 ObservableCreate(也就是source) 和 Scheduler ,在得到 Worker 后,调用了这行代码

source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));

这行代码将 观察者 和 Worker 包装成 ObserveOnObserver 对象,然后调用 source.subscribe 方法

    @SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
           ......
        }
    }

这里的source即我们的ObservableCreate 对象,所以会调用 ObservableCreate 类的 subscribeActual 方法

    protected void subscribeActual(Observer<? super T> observer) {
        //用observer构造一个CreateEmitter对象
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        //调用observer的onSubscribe方法
        observer.onSubscribe(parent);

        try {
            //调用被观察者source的subscribe方法,传入CreateEmitter
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

可以看到里面用一个CreateEmitter 包装下游的observer,然后调用observer的onSubscribe方法,传入 CreateEmitter对象,
这里也说明了比较重要的一点:onSubscribe方法是在订阅的时候被调用的,且在被观察着source 的 subscribe方法之前调用。

经过上面的分析,这里的 observer 是 ObserveOnObserver 对象,我们看看他的 onSubscribe 方法:

        @Override
        public void onSubscribe(Disposable s) {
            if (DisposableHelper.validate(this.s, s)) {
                this.s = s;
                if (s instanceof QueueDisposable) {
                    @SuppressWarnings("unchecked")
                    QueueDisposable<T> qd = (QueueDisposable<T>) s;

                    int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);

                    if (m == QueueDisposable.SYNC) {
                        sourceMode = m;
                        queue = qd;
                        done = true;
                        actual.onSubscribe(this);
                        schedule();
                        return;
                    }
                    if (m == QueueDisposable.ASYNC) {
                        sourceMode = m;
                        queue = qd;
                        actual.onSubscribe(this);
                        return;
                    }
                }

                queue = new SpscLinkedArrayQueue<T>(bufferSize);

                actual.onSubscribe(this);
            }
        }

经过条件判断,这个方法最终会执行 actual.onSubscribe 方法,也就是我们原始观察者的 onSubscribe,这里不再赘述= =

回到 ObservableCreate 类的 subscribeActual 方法,接下来调用上游对象 source(即ObservableOnSubscribe)的 subscribe 方法,并传入CreateEmitter,即调用我们一开始写的

    Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("hey");
                e.onComplete();
            }

这里使用我们传入的 CreateEmitter ,调用onNext 等一些列方法,我们来看看比较重要的 onNext 方法:

public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

里面的 observer 是我们的在 CreateEmitter的构造函数中传入的 ObserveOnObserver对象,我们看看ObserveOnObserver的onNext方法:

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            schedule();
        }

我们设置了在主线程调用,所以这里经过判断,会执行 queue.offer(t)代码,即把我们的任务放入队列中,然后调用 scheduler,

        void schedule() {
            if (getAndIncrement() == 0) {
                worker.schedule(this);
            }
        }

经过上面分析,这里的worker 是 HandlerWorker对象,它的schedule 方法的参数是Runnable,而我们的ObserveOnObserver 也实现了 Runnbale接口,调用 scheduler的时候传入了this:

        @NonNull
        public Disposable schedule(@NonNull Runnable run) {
            return schedule(run, 0L, TimeUnit.NANOSECONDS);
        }

        @Override
        public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
            if (run == null) throw new NullPointerException("run == null");
            if (unit == null) throw new NullPointerException("unit == null");

            if (disposed) {
                return Disposables.disposed();
            }

            run = RxJavaPlugins.onSchedule(run);

            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

            Message message = Message.obtain(handler, scheduled);
            message.obj = this; // Used as token for batch disposal of this worker's runnables.

            handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));

            // Re-check disposed state for removing in case we were racing a call to dispose().
            if (disposed) {
                handler.removeCallbacks(scheduled);
                return Disposables.disposed();
            }

            return scheduled;
        }

这里面很简单,将传入的 Runnbale 打包成 Message,然后通过 handler发送到主线程。

我们回过头看 run 方法的实现,即 ObserveOnObserver 中的 run 方法

        @Override
        public void run() {
            if (outputFused) {
                drainFused();
            } else {
                drainNormal();
            }
        }

        void drainNormal() {
            int missed = 1;

            final SimpleQueue<T> q = queue;
            final Observer<? super T> a = actual;

            for (;;) {
                if (checkTerminated(done, q.isEmpty(), a)) {
                    return;
                }

                for (;;) {
                    boolean d = done;
                    T v;

                    try {
                        v = q.poll();
                    } catch (Throwable ex) {
                        Exceptions.throwIfFatal(ex);
                        s.dispose();
                        q.clear();
                        a.onError(ex);
                        worker.dispose();
                        return;
                    }
                    boolean empty = v == null;

                    if (checkTerminated(d, empty, a)) {
                        return;
                    }

                    if (empty) {
                        break;
                    }

                    a.onNext(v);
                }

                missed = addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }

这里面也很简单,就是从队列中取出元素,然后交给我们的观察者的 onNext方法。- -

至此,整个调度流程就完成了!!!

这里对流程做一个总结:

首先顺序执行我们的程序,自上而下每个操作符都对上游对象进行包装,subscribe(Observer)方法调用后,自下而上一层层解包装,同时,将观察者自下而上进行包装,最终在subscribe(ObservableEmitter e)调用e.onNext等方法,这里的e就是从下游传递上来的Emitter。

RxJava线程调度

知道了RxJava采用装饰者模式后,理解其他操作符就不难了,subscribeOn操作符是用来决定被观察者执行的线程,

我们直接看subscribeOn方法,返回一个ObservableSubscribeOn对象:

    public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
        //调用observer的onSubscribe,说明onSubscribe是在订阅的线程执行的
        s.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
}

可以看到又是一次包装,在subscribeActual中直接调用了Observer的onSubscribe方法,说明onSubscribe方法执行的线程和订阅的线程是一致的。
接下来这条常常的代码,我们先看SubscribeTask:

    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            source.subscribe(parent);
        }
    }

它是ObservableSubscribeOn的内部类,实现Runnable类,说明它可以被执行,run方法中正是调用了source.subscribe()方法,这里注意一点,这个source会一层层调用上游的代码,也就是说在subscribeOn操作符之上的操作都会被影响

parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));

接着是scheduler.scheduleDirect方法,假设scheduler是Schedulers.IO,也就是在子线程执行source.subscribe,
Schedulers.IO最终得到IoScheduler:

public final class IoScheduler extends Scheduler {

    public IoScheduler() {
        this(WORKER_THREAD_FACTORY);
    }

    public IoScheduler(ThreadFactory threadFactory) {
        this.threadFactory = threadFactory;
        this.pool = new AtomicReference<CachedWorkerPool>(NONE);
        start();
    }
}

内部配置了线程池等,由它来执行run方法。

至此,线程调度就解析到这里。

欢迎纠正,喜欢点个赞。

分享到 :
0 人收藏
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

积分:7942463
帖子:1588486
精华:0
期权论坛 期权论坛
发布
内容

下载期权论坛手机APP