在之前的系列中,我吧sequence翻译成了序列,题目我就不翻译了,感觉翻译过来的有点失去了点啥。其他好多地方都是用stream(流)来比喻和形容。
可视化Observable
在Rxjs编程中你已经学了一些关于使用最频繁的操作符了。讨论什么是sequence的操作符感觉有点抽象。为了帮助开发者更容易的理解操作符,我们使用marble diagrams(弹子图?翻译估计有问题)来标准可视化。他们很形象的代表了异步的数据流,并且你可以在其他的rxjs资料里面找到。
让我们看下range操作符,它返回一个发射特定范围内整数的Observable:如下图所示:
这个长箭头代表着这个Observable,x轴代表者时间,每一个圆圈代表着内部调用onNext()时Observable发射的值。在产生第三个值之后,range调用了上图中用垂直线表示的onCompleted函数。
让我们看下包含若干Observable的例子。merge操作符操作两个不同的Observable,并且返回合并值的新的Observable。
interval操作符返回一个在规定的毫秒间隔时间内产生增加数的Observable。
在下面的例子中,我们将会合并两个使用了interval的不同的Observable,这两个Observable都是在不同的时间间隔而产生值:
var a = Rx. Observable. interval(200 ).map(function (i) {
return 'A' + i ;
});
var b = Rx. Observable. interval(100 ).map(function (i) {
return 'B' + i ;
});
Rx. Observable. merge(a, b).subscribe(function (x) {
console. log (x);
});
结果如下:
》B0, A0, B1, B2, A1, B3, B4…
merge操作符的示意图如下:
这里,沿着y轴虚线箭头指向了A和B序列中的每个元素变化之后的最终结果。C序列代表着作为结果的Observable,它包含了A和B序列合并后的元素。如果不同的序列的元素同时发射,合并序列的元素顺序是随机的。
基础的序列(sequence)操作符
几乎在Rxjs转变Observable的数十个操作符里,也有在任何一门语言里使用最多的有收集-处理能力的如:map、filter、reduce。在JavaScrpit中,你可以在数组的实例中找到这些操作符(函数)。
Rxjs遵循着JavaScript的约定,你有机会可以找到几乎和数函数作一样的操作符,实际上,我们将会展示数组和Observable的真实使用,用以呈现这两者的API是有多么的相似。
map
map是用的最多的序列转化操作符。它需要一个Observable和一个函数,并且把函数应用于源Observable的每一个值。它返回一个新的转化后值的Observable。
在上面两种情况,src都是不会改变的。
这个代码,和下面的代码,使用logValue定义:
var logValue = function(val){console.log(val)};
他可能是我们传递给map去做一些异步计算改变那个值的函数。在某些状况下,map可能不会如期盼的那样工作,对于这些状况,更好的办法是使用flatMap操作符。
filter
filter需要一个Observable和一个函数,并且它会使用这个函数去测试Observable中的每一个元素。它将会返回一个序列,这个序列中所的元素都是那个函数返回true的值。
reduce
reduce(也做fold)需要一个Observable并且返回一个新的只包含一个单个item的Observable,这个单个的item是某个函数应用到每一元素的的结果。这个函数接受当前元素和这个函数上一次调用的结果。
reduce是一个处理某个序列很强大的操作符。实际上它是一种被称为聚合操作符(aggregate operators)的一个完整子集的基础实现。
Aggregate Operators
聚合操作符处理某个序列并且返回单个的值。例如:Rx.Observable.first需要一个Observable和一个可选的断言函数并且返回第一个满足断言函数的元素。
计算某个序列的平均值也是一个聚合操作中。Rxjs提供了average操作的实例,但是有这个选着的缘故,我们使用reduce来实现下。每一个聚合操作都能被仅仅使用reduce来实现。
var avg = Rx. Observable. range(0 , 5 )
.reduce(function (prev, cur) {
return {
sum : prev. sum + cur,
count: prev. count + 1
};
}, { sum : 0 , count: 0 })
.map(function (o) {
return o. sum / o. count;
});
var subscription = avg. subscribe(function (x) {
console. log('Average is: ' , x);
});
》Average is: 2
面代码的解释,我就不翻译了,直接看那就能看懂。设想下,如若现在我们要计算一个步行者的平均速度,用reduce很好实现,但是假设,在时间轴上,步行者永远走下去,那么像reduce一样的聚合操作符将永远不会调用它的观察者(obeserver)的onNext函数。
很高兴的是,Rxjs团队已经想到这种情况了,并且给我们提供了一个scan操作符,它扮演着像reduce的角色,但是它会发送每一个中间的结果。代码如下:
var avg = Rx. Observable. interval(1000 )
.scan(function (prev, cur) {
return {
sum : prev. sum + cur,
count: prev. count + 1
};
}, { sum : 0 , count: 0 })
.map(function (o) {
return o. sum / o. count;
});
var subscription = avg. subscribe( function (x) {
console. log(x);
});
使用如上方式,我们就可以聚合某个消耗时间长或者没有时间限制的序列了。在上面的代码中,我们每秒产生了一个增量的整数,并且用scan取代了先前的reduce。我们就以每秒为间隔取得了到目前为止所有的平均值。
- flatMap
- 如果你又一个Observable它的结果是许多嵌套的Observable将则怎么做?大部分的时间,你想到的就是统一这些嵌套的Observable的元素到一个单个的序列中。这也是flatMap所要明确干的事。
- flatMap操作符需要一个Observable参数,这个参数的元素也是Observable,并且返回只有一个孩子Observable的平坦值的Observable。
我们可以看到到每一在A(A1,A2,A3)中的元素也是Observable序列,一旦我们对A应用flatMap转化功能,我们将获得一个Observable,它包含所A所有不同孩子的所有元素。
flatMap是一个强大的操作符,但是它比起我们目前学到的其他操作符都要难理解些,把它认为是这些Observable的concatAll()函数。
contcatAll是需要一个数组的数组函数,并且返回一个平坦的单一的数组,这个数组包含所有子数组的值,而不是这些子数组本身。我们可以使用reduce去建一个这样的函数:
function concatAll (source) {
return source.reduce(function (a, b) {
return a.concat(b);
});
}
我们可以这样使用它:
concatAll([[0, 1, 2], [3, 4, 5], [6, 7, 8]] );
// [0 , 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 ]
flatMap做同样的事,但不是平坦的arrays而是observable……
Canceling Sequences
在Rxjs中,我们可以取消正在运行的Observable。相比起回调函数或者promise(有部分promise实现支持取消)这些异步交互的方式,这就是优势。
有隐式或者是明确的这两者主要的取消Observable的方式。
1)明确的取消:dispose
Observable本身并没有取消方法。当我们订阅到一个Observable的时候我么就是获得了一个Disposable对象,这个dispose对象代表这个特殊的订阅。这样我们就可以在这个对象中调用dispose方法了,之后这个特殊的订阅就会停止从Observable中接受通知。
在接下来的例子中,我们订阅了两个counter的Observable,counter每秒发射一个自增的整数。两秒后我们取消第二个订阅(subscription),之后我们看到它的输出停止了,但是第一个订阅者的输出还是在进行的。
var counter = Rx. Observable. interval(1000 );
var subscription1 = counter. subscribe(function (i) {
console. log ('Subscription 1:' , i );
});
var subscription2 = counter. subscribe(function (i) {
console. log ('Subscription 2:' , i );
});
setTimeout(function () {
console. log ('Canceling subscription2!' );
subscription2. dispose();
}, 2000 );
结果将会如下:
》Subscription 1: 0
Subscription 2: 0
Subscription 1: 1
Subscription 2: 1
Canceling subscription2!
Subscription 1: 2
Subscription 1: 3
Subscription 1: 4
…
2)通过Operator的隐式取消
大部分的时候,操作符将会为你自动取消subscription。像range或者take,当序列结束或者满足某个操作的条件时,他们将会取消订阅。更高级的如withLastestFrom或者flatMapLastest等操作符,当它们动态创建Observable时,将会在需要订阅的时候内部自动地创建和销毁。
当我们使用Observable包装外部没有提供取消功能的API接口时,当取消Observable时,Observable将会停止发送通知,但是里面的API将不会被注销。例如你使用一个包装着promise的Observable,当取消时Observable会停止发射,但是里面的promise不会停止。
如下代码,我们试图取消关联到某个包装了promise Observable的订阅,同时,我们也把promise按照传统的方式了进行一个操作。这个promise将会在5秒后被执行,但是取消订阅会立马被执行。
var p = new Promise(function (resolve, reject) {
window.setTimeout(resolve, 5000 );
});
p.then(function () {
console.log('Potential side effect!' );
});
var subscription = Rx.Observable.fromPromise(p).subscribe(function (msg) {
console.log('Observable resolved!' );
});
subscription.dispose();
5秒之后我们将会看到:
》Potential side effect!
如果我们取消到Observable的订阅,它将会很有效的停止重Observable接受通知。但是这个promise的then方法还会继续执行,这展示给我们:取消Observable并不会取消它内部的promise。
因此,在Observable内部使用外部的API接口必须知道里面的细节是很重要的。你可以想象:你已经取消了一个序列,但是它内部的一些api任然在运行,并且给你的程序带来一些副作用,这些错误真的很难被捕获。
Handling Errors
在回调函数中,我们之所以可以使用传统的try/catch机制,是因为它是同步的。由于它运行在任何异步代码之前,所以它将捕获不到任何错误。
在回调函数中的解决方式是:传递这个错误作为回调函数的一个参数,这样虽然可以起到作用,但是会使代码变得相当脆弱。
下面让我们看看Observable是如和捕获错误:
The onError Handler
当我们说起observer时候,必须记住三个函数:onNext、onCompleted、onError。onError是Observable中有效处理错误的关键。
为了展示它如何工作,如下将会有一个简单的函数,它需要json串的数组并且返回一个Observable,这个Observable发送用JSON.parse转化那些json串后的对象。
function getJSON (arr) {
return Rx.Observable.from(arr).map(function (str) {
var parsedJSON = JSON .parse(str);
return parsedJSON;
});
}
通过getJSON我们将会传递三个json串,第二个串会包含一个语法错误,因此JSON.parse无法解析它。接着我们通过提供onNext和onError处理器来订阅那个结果:
getJSON([
'{"1": 1, "2": 2}' ,
'{"success: true}' , // Invalid JSON string
'{"enabled": true}'
] ).subscribe(
function (json) {
console. log ('Parsed JSON: ' , json);
},
function (err) {
console. log (err. message);
}
);
结果如下:
》Parsed JSON: { 1: 1, 2: 2 }
JSON.parse: unterminated string at line 1 column 8 of the JSON data
针对数组的第一个结果 Observable发射了一个JSON转译对象,但是第二个会抛出一个异常,onError处理器将会捕获这个异常,并打印它。默认的行为是,无论什么时候,只要异常一发生,Observable将会停止发射,并且onCompleted将不会被调用。
捕获异常
到目前为止我们已经知道如何侦测一个异常,并且去做些什么。但是我们还没没能响应我们接着要做的事。Observable实例提供了一个catch操作符,它允许我们对一个Observable的错误进行响应之后而继续进行其他Observable。
catch操作符需要一个以异常为入参的Observable或者函数,它返回另外一个Observable。在我们的例子中,由于在原始的Observable中有错误,我们想Observable发射一个包含异常属性的JSON对象。
function getJSON (arr) {
return Rx.Observable.from(arr).map(function (str) {
var parsedJSON = JSON.parse(str);
return parsedJSON;
});
}
var caught = getJSON(['{"1": 1, "2": 2}' , '{"1: 1}' ]).catch (
Rx.Observable.return ({
error: 'There was an error parsing JSON'
})
);
caught.subscribe(
function (json) {
console.log('Parsed JSON: ' , json);
},
function (e) {
console.log('ERROR' , e.message);
}
);
上述代码中,我们创建了一个叫做caught的新的Observable,它使用catch操作符捕获初始Observable里的异常。如果有异常,它将继续这个序列通过使用仅仅发射一个异常属性item的Observable来描述这个错误,结果如下:
》Parsed JSON: Object { 1: 1, 2: 2 }
Parsed JSON: Object { error: “There was an error parsing JSON” }
下面就是catch操作符的marble diagram(弹子图):
注意到“X”在序列上的,它代表着一个异常(错误)。不同的Observable值的形状:三角形代表着这些值来自另外一个Observable,在这里,那个Observable是我们异常情况下返回的Observable。
catch对序列中的异常的交互来说非常有用,并且它的好多行为跟传统的try-catch块是很相似的。在好多情况下,忽略序列中某个项发生的异常并且让这个序列继续下去是很方便的。在这些情况下,我们可以使用retry操作符。
Retrying Sequences
有些时候仅仅是发生错误而不需要我们去做些什么。例如,一个由于用户零星Internet链接或者远程服务器宕机而导致的远程数据请求的超时,在这种情况下,这将会是一个很好的办法,如果我们一直请求我们需要的数据直到成功为止。那个retry做了如下操作:
Rx.DOM.get ('/products' ).retry(5 )
.subscribe(
function (xhr) { console.log(xhr); },
function (err) { console.error('ERROR: ' , err); }
);
在上面的代码中,我们创建了一个函数,它返回了一个使用XMLHttpRequest向一个url重复获取内容的Observable。由于我们的链接有可能会不靠谱,我们在订阅之前增加了retry(5),确保在异常的情况下,在Observable挂起和报错之前会尝试5次。
当我们使用retry的时候,有两件事很重要:
1)如果我们不传任何参数,它将会无限次尝试,直到Observable结束并且没有异常。这对程序来说是非常危险的,一旦Observable一直报错的话。如果我们使用同步的多个Observable,它将会有同样无限循环的结果。
2)retry将会重新retry整个的Observable,即使某些项没有异常。因为每次retry,它都会重新运行,当我们处理的某些项的时候,将会导致意外的结果,这一点也很重要。