|
本篇文章的目的: ①了解RxJava的基本流程 ②了解RxJava中线程调度的实现 ③了解了上面那些,其他的操作符对你来说就不是问题了
RxJava基本流程
我们从基本的使用作为入口:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("hey");
e.onComplete();
}
}).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe() : d = " + d );
}
@Override
public void onNext(String value) {
Log.d(TAG, "onNext() : value = " + value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError() : e = " + e );
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete()");
}
});
同时,结合下面这张图食用效果更佳哦!!
Observable.create()
我们以create方法作为入口,它接受的参数是一个ObservableOnSubscribe对象,ObservableOnSubscribe是一个接口,里面只有一个subscribe方法我们需要实现:
public interface ObservableOnSubscribe<T> {
void subscribe(@NonNull ObservableEmitter<T> e) throws Exception;
}
create方法接受一个ObservableOnSubscribe对象,并包装成一个ObservableCreate对象,并调用RxJavaPlugins.onAssembly方法:
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
这里调用RxJavaPlugins.onAssembly方法,里面调用相关的hook方法,这里不详细讲,我们只要知道它返回了原对象。
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
也就是说create方法返回了一个包装了我们ObservableOnSubscribe对象的ObservableCreate对象,它继承自Observable:
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
}
该类的构造函数只是做了 source 参数的保存,该 source是我们调用create 方法时传入的 ObservableOnSubscribe,也就是说,用ObservableCreate对ObservableOnSubscribe进行包装(即装饰者模式),这里也是第一次包装,接下来我们看observeOn方法:
observeOn()
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError) {
return observeOn(scheduler, delayError, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
可以看到,这里的 observeOn(Scheduler) 最终调用三个参数的 observeOn(Scheduler,boolean,int) , 三个参数的 observeOn方法里面把我们当前包装好的 Observable 对象(this) 和新传入的 Scheduler 对象 再进行一层包装,得到ObservableObserveOn 对象,它也是继承自Observable的
//ObservableObserveOn类
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
}
//AbstractObservableWithUpstream类
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
AbstractObservableWithUpstream(ObservableSource<T> source) {
this.source = source;
}
......
}
到这里,我们做个小结:
首先create方法把我们传入的 ObservableOnSubscribe 包装成一个 ObservableCreate(继承自 Observable); 然后调用上面的 ObservableCreate 的 observeOn 方法,observeOn方法里面把我们当前包装好的 ObservableCreate 对象和新传入的 Scheduler 对象 再进行一层包装,得到ObservableObserveOn 对象,它也是继承自Observable的
接下来就是subscribe(Observer)方法了
subscribe()
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe() : d = " + d );
}
@Override
public void onNext(String value) {
Log.d(TAG, "onNext() : value = " + value );
}
...
});
我们看一下observable的subscribe方法:
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
...
}
}
RxJavaPlugins.onSubscribe同样是hook方法,最后调用了自己的subscribeActual(Observer),也就是说:subscribeActual方法是在我们完成订阅 即调用subscribe(Observer)的时候 被调用的,参数就是从下游传递上来的Observer对象。 从上面的分析我们知道,我们当前的对象是ObservableObserveOn 对象,那么我们直接看ObservableObserveOn 的subscribeActual方法:
/**
* ObservableObserveOn 的subscribeActual方法
*/
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
这里的 scheduler 对象就是我们之前传入的 AndroidSchedulers.mainThread() 方法得到的 Scheduler对象
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
new Callable<Scheduler>() {
@Override public Scheduler call() throws Exception {
return MainHolder.DEFAULT;
}
});
public static Scheduler mainThread() {
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
MAIN_THREAD 是个静态的对象,它的初始化是在 RxAndroidPlugins.initMainThreadScheduler() 方法中,且传入一个 call 回调,我们看看在 RxAndroidPlugins.initMainThreadScheduler() 方法中它是怎么生成的,
在下面的方法中,initMainThreadScheduler 方法里面最终会调用 callRequireNonNull 方法,callRequireNonNull方法会回调到call 方法
public static Scheduler initMainThreadScheduler(Callable<Scheduler> scheduler) {
if (scheduler == null) {
throw new NullPointerException("scheduler == null");
}
Function<Callable<Scheduler>, Scheduler> f = onInitMainThreadHandler;
if (f == null) {
return callRequireNonNull(scheduler);
}
return applyRequireNonNull(f, scheduler);
}
static Scheduler callRequireNonNull(Callable<Scheduler> s) {
try {
Scheduler scheduler = s.call();
if (scheduler == null) {
throw new NullPointerException("Scheduler Callable returned null");
}
return scheduler;
} catch (Throwable ex) {
throw Exceptions.propagate(ex);
}
}
我们看看 call 方法
private static final class MainHolder {
static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
}
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
new Callable<Scheduler>() {
@Override public Scheduler call() throws Exception {
return MainHolder.DEFAULT;
}
});
call 方法会返回 MainHolder 类的静态属性 DEFAULT,即 HandlerScheduler 对象,HandlerScheduler 构造函数传入了和我们主线程的 Looper 绑定的 Handler,这里也建立了我们在主线程执行观察者方法的基础
final class HandlerScheduler extends Scheduler {
private final Handler handler;
HandlerScheduler(Handler handler) {
this.handler = handler;
}
.......
}
好,我们回到 ObservableObserveOn 的subscribeActual方法,现在我们知道 scheduler 就是 HandlerScheduler 了,接下来调用它得 createWorker 方法,
/**
* ObservableObserveOn 的subscribeActual方法
*/
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
HandlerScheduler 的 createWorker 方法返回 它的 静态内部类 HandlerWorker
@Override
public Worker createWorker() {
return new HandlerWorker(handler);
}
然后下一步就很关键了,我们知道 ObservableObserveOn 对象包装了我们的 ObservableCreate(也就是source) 和 Scheduler ,在得到 Worker 后,调用了这行代码
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
这行代码将 观察者 和 Worker 包装成 ObserveOnObserver 对象,然后调用 source.subscribe 方法
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
......
}
}
这里的source即我们的ObservableCreate 对象,所以会调用 ObservableCreate 类的 subscribeActual 方法
protected void subscribeActual(Observer<? super T> observer) {
//用observer构造一个CreateEmitter对象
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//调用observer的onSubscribe方法
observer.onSubscribe(parent);
try {
//调用被观察者source的subscribe方法,传入CreateEmitter
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
可以看到里面用一个CreateEmitter 包装下游的observer,然后调用observer的onSubscribe方法,传入 CreateEmitter对象, 这里也说明了比较重要的一点:onSubscribe方法是在订阅的时候被调用的,且在被观察着source 的 subscribe方法之前调用。
经过上面的分析,这里的 observer 是 ObserveOnObserver 对象,我们看看他的 onSubscribe 方法:
@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
if (s instanceof QueueDisposable) {
@SuppressWarnings("unchecked")
QueueDisposable<T> qd = (QueueDisposable<T>) s;
int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
if (m == QueueDisposable.SYNC) {
sourceMode = m;
queue = qd;
done = true;
actual.onSubscribe(this);
schedule();
return;
}
if (m == QueueDisposable.ASYNC) {
sourceMode = m;
queue = qd;
actual.onSubscribe(this);
return;
}
}
queue = new SpscLinkedArrayQueue<T>(bufferSize);
actual.onSubscribe(this);
}
}
经过条件判断,这个方法最终会执行 actual.onSubscribe 方法,也就是我们原始观察者的 onSubscribe,这里不再赘述= =
回到 ObservableCreate 类的 subscribeActual 方法,接下来调用上游对象 source(即ObservableOnSubscribe)的 subscribe 方法,并传入CreateEmitter,即调用我们一开始写的
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("hey");
e.onComplete();
}
这里使用我们传入的 CreateEmitter ,调用onNext 等一些列方法,我们来看看比较重要的 onNext 方法:
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
里面的 observer 是我们的在 CreateEmitter的构造函数中传入的 ObserveOnObserver对象,我们看看ObserveOnObserver的onNext方法:
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
我们设置了在主线程调用,所以这里经过判断,会执行 queue.offer(t)代码,即把我们的任务放入队列中,然后调用 scheduler,
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
经过上面分析,这里的worker 是 HandlerWorker对象,它的schedule 方法的参数是Runnable,而我们的ObserveOnObserver 也实现了 Runnbale接口,调用 scheduler的时候传入了this:
@NonNull
public Disposable schedule(@NonNull Runnable run) {
return schedule(run, 0L, TimeUnit.NANOSECONDS);
}
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
if (disposed) {
return Disposables.disposed();
}
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
这里面很简单,将传入的 Runnbale 打包成 Message,然后通过 handler发送到主线程。
我们回过头看 run 方法的实现,即 ObserveOnObserver 中的 run 方法
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = actual;
for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
boolean d = done;
T v;
try {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.dispose();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;
if (checkTerminated(d, empty, a)) {
return;
}
if (empty) {
break;
}
a.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
这里面也很简单,就是从队列中取出元素,然后交给我们的观察者的 onNext方法。- -
至此,整个调度流程就完成了!!!
这里对流程做一个总结:
首先顺序执行我们的程序,自上而下每个操作符都对上游对象进行包装,subscribe(Observer)方法调用后,自下而上一层层解包装,同时,将观察者自下而上进行包装,最终在subscribe(ObservableEmitter e)调用e.onNext等方法,这里的e就是从下游传递上来的Emitter。
RxJava线程调度
知道了RxJava采用装饰者模式后,理解其他操作符就不难了,subscribeOn操作符是用来决定被观察者执行的线程,
我们直接看subscribeOn方法,返回一个ObservableSubscribeOn对象:
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
//调用observer的onSubscribe,说明onSubscribe是在订阅的线程执行的
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
}
可以看到又是一次包装,在subscribeActual中直接调用了Observer的onSubscribe方法,说明onSubscribe方法执行的线程和订阅的线程是一致的。 接下来这条常常的代码,我们先看SubscribeTask:
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
它是ObservableSubscribeOn的内部类,实现Runnable类,说明它可以被执行,run方法中正是调用了source.subscribe()方法,这里注意一点,这个source会一层层调用上游的代码,也就是说在subscribeOn操作符之上的操作都会被影响
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
接着是scheduler.scheduleDirect方法,假设scheduler是Schedulers.IO,也就是在子线程执行source.subscribe, Schedulers.IO最终得到IoScheduler:
public final class IoScheduler extends Scheduler {
public IoScheduler() {
this(WORKER_THREAD_FACTORY);
}
public IoScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
this.pool = new AtomicReference<CachedWorkerPool>(NONE);
start();
}
}
内部配置了线程池等,由它来执行run方法。
至此,线程调度就解析到这里。
欢迎纠正,喜欢点个赞。 |