一、示例代码
public static void create() {
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
Log.d(TAG, "Observable>>subscribe start");
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
Log.d(TAG, "Observable>>subscribe stop");
}
});
observable.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "Observer>>onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "Observer>>onNext:" + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "Observer>>onError:" + e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "Observer>>onComplete");
}
});
}
打印日志如下:
D/RxJavaTest: Observer>>onSubscribe
D/RxJavaTest: Observable>>subscribe start
D/RxJavaTest: Observer>>onNext:1
D/RxJavaTest: Observer>>onNext:2
D/RxJavaTest: Observer>>onNext:3
D/RxJavaTest: Observer>>onComplete
D/RxJavaTest: Observable>>subscribe stop
二、基本订阅流程分析
1、分析订阅方法(observable.subscribe())
subscribe源码如下:
public final void subscribe(Observer<? super T> observer) {
try {
......
subscribeActual(observer);
} catch (NullPointerException e) {
throw e;
}
......
}
subscribeActual()的源码如下:
protected abstract void subscribeActual(Observer<? super T> observer);
subscribeActual()是Observable的一个抽象方法,具体源码要看其实现类,那其实现类是什么呢?observable 对象是Observable.create()创建的,Observable.create()返回的就是Observable的实现类的实例。
2、分析创建被观察者对象(Observable.create())
create源码如下:
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
就是返回一个ObservableCreate对象。那我们看看ObservableCreate中有没有实现subscribeActual()。
果然实现了,源码如下:
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
上面代码中我们要明确两个对象具体是什么?
1、形参observer
就是observable.subscribe()时传入的对象,具体为:
new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "Observer>>onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "Observer>>onNext:" + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "Observer>>onError:" + e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "Observer>>onComplete");
}
}
2、对象source
ObservableCreate中为source赋值的地方如下:
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
}
即在ObservableCreate构造方法中传入的source。ObservableCreate构造方法在Observable.create()中调用,如下:
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
传给ObservableCreate的source和Observable.create()传入的source是同一个。
Observable.create()调用方法如下:
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
Log.d(TAG, "Observable>>subscribe start");
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
Log.d(TAG, "Observable>>subscribe stop");
}
});
即对象source实际就是:
new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
Log.d(TAG, "Observable>>subscribe start");
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
Log.d(TAG, "Observable>>subscribe stop");
}
}
1)、分析observer.onSubscribe(parent);
这里是回调Observer的onSubscribe方法,所以onSubscribe方法最先回调。
2)、分析source.subscribe(parent);
这里parent是CreateEmitter的实例,CreateEmitter源码如下:
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
......
}
如上面分析,source.subscribe(parent)就是就是下面这个subscribe方法,CreateEmitter的实例parent就是下面的e。
public void subscribe(ObservableEmitter e) throws Exception {
Log.d(TAG, "Observable>>subscribe start");
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
Log.d(TAG, "Observable>>subscribe stop");
}
因此e.onNext();就会回调CreateEmitter中onNext方法,而CreateEmitter中onNext方法实际回调的是Observer的onNext方法。
Obsever被封装在ObservableEmitter 中,然后通过ObservableOnSubscribe的subscribe方法方法传递到Observable中。实际上上面e.next()就是调用的Obsever的onNext()方法。
因此被观察者中发送的事件就传递到观察者中了。
三、多个操作符的订阅分析
1、示例代码
public static void map() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
Log.d(TAG, "Observable>>subscribe start");
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
Log.d(TAG, "Observable>>subscribe start");
}
})
// 第一个Observable
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
Log.d(TAG, "map>>apply:" + integer);
return "map:" + integer;
}
})
// 第二个Observable
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "Observer>>onSubscribe");
}
@Override
public void onNext(String str) {
Log.d(TAG, "Observer>>onNext:" + str);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "Observer>>onError:" + e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "Observer>>onComplete");
}
});
}
日志如下:
D/RxJavaTest: Observer>>onSubscribe
D/RxJavaTest: Observable>>subscribe start
D/RxJavaTest: map>>apply:1
D/RxJavaTest: Observer>>onNext:map:1
D/RxJavaTest: map>>apply:2
D/RxJavaTest: Observer>>onNext:map:2
D/RxJavaTest: map>>apply:3
D/RxJavaTest: Observer>>onNext:map:3
D/RxJavaTest: Observer>>onComplete
D/RxJavaTest: Observable>>subscribe start
使用map操作符将整型数据转换为字符串数据。
2、map操作符解密
map操作符其实也是一个Observable,不信我们点开map的源码如下:
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
......
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
......
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
actual.onNext(v);
}
}
}
map操作符返回的是一个ObservableMap对象。如何实现map操作符转换数据类型的呢?
MapObserver的onNext方法调用了mapper.apply(t),即下面这个方法:
public String apply(Integer integer) throws Exception {
Log.d(TAG, "map>>apply:" + integer);
return "map:" + integer;
}
从而实现了数据转换,然后调用了actual.onNext(v);将转换后的数据传递下去。
3、订阅流程分析
第二个Observable的subscribe方法如下:
@Override
public final void subscribe(Observer<? super T> observer) {
try {
subscribeActual(observer);
} catch (NullPointerException e) {
throw e;
}
}
源码3.1
实际调用的是subscribeActual方法,因为subscribeActual是一个抽象方法,具体实现在其实现类中。这里第二个Observable是ObservableMap。所以实际调用的ObservableMap中的subscribeActual方法。源码如下:
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
调用了source的subscribe方法,source.subscribe源码如上面“源码3.1”,实际调用的还是实现类的subscribeActual。source是上游的Observable即第一个Observable,即Observable.create创建的ObservableCreate对象,那上面源码相当于调用ObservableCreate中的subscribeActual方法,源码如下:
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
如第一章分析source.subscribe(parent);实际调用到
public void subscribe(ObservableEmitter e) throws Exception {
Log.d(TAG, "Observable>>subscribe start");
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
Log.d(TAG, "Observable>>subscribe stop");
}
总结:
订阅时调用subscribe时,会回调上游的subscribe,然后回调上游的上游的subscribe,直到回调到第一个Observable的subscribe。因为subscribe中都是调用subscribeActual。实际订阅时先调用自己的subscribeActual,然后回调上游的subscribeActual,然后回调上游的上游的subscribeActual,直到回调到第一个Observable的subscribeActual。
4、Observer是如何传递
1)、Observer向上游传递进行封装
Observer也是在subscribe时进行传递的。通过上面的分析我们明确一点:subscribe时会逐级调用上游的subscribe,subscribe会调用subscribeActual,订阅的逻辑实际在每个Observable的subscribeActual中实现。
第二个Observable调用subscribe时,实际调用ObservableMap的subscribeActual,源码如下:
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
......
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
actual.onNext(v);
}
}
}
function就是map操作符传入的function。我们自己定义的Observer作为第一个参数传给了MapObserver。然后MapObserver又通过source.subscribe()传给了第一个Observable即ObservableCreate,最后传给ObservableCreate的subscribeActual方法。源码如下:
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
}
MapObserver传给了CreateEmitter。CreateEmitter的onNext()方法实际调用的就是MapObserver的onNext。
总结:
上面的过程都是在Observable订阅的时候实现的,下游的Observer作为对象传到上游的Observer中,然后一直传到最上面的Observer。也就是最上面的Observer包含了下游所有Observer。
2)、Observer向下游传递进行调用
然后调用source.subscribe(parent);就可以回调到如下方法:
new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
Log.d(TAG, "Observable>>subscribe start");
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
Log.d(TAG, "Observable>>subscribe start");
}
}
e.onNext();即CreateEmitter.onNext()。即MapObserver.onNext(),先调用mapper.apply(t)进行数据转换,然后将结果传到下游的Observer。即我们自定义的Observer.onNext。 |