1.Rxjs基本概念 从接触Angular2开始,一直在使用Rxjs但没有深究,最近抽空整体了解下Rxjs,略作整理。Rxjs可以理解为一个管理事件或任务序列(异步)的库,它提供了一个核心类型Observable可观察对象,卫星类型(围绕于Observable): Observer、Scheduler、Subject和操作符。这几个概念可以用下图来举例说明(windows画图工具随便画的,粗糙了些,示意还是清晰的): 
Observable 可观察对象:是一串事件或消息的集合,在图中列举了一个需要发送一串数字序列的可观察对象。 Observer 观察者:是一组回调函数,用来处理Observable中的内容。在图中它接收了一串数字序列。 Subscription 订阅:Observable的执行,功能好比一个开关,可以订阅或取消订阅。在图中,如果订阅则它下游的Observer将收到数字;取消订阅,下游的Observer将收不到任何消息。一个Observable可以被多个Observer订阅。
Subject 主题:将Observable中的内容广播到多个Observer。在图中相当于一个多分支开关,如果主题被订阅则所有观察者都将收到同样的一串数字序列。 Scheduler 调度者:决定何时启动订阅和何时发送通知。如图所示,调度者不是必须的。在图中数字序列从Observable发出,经过调度,序列的顺序被改变了。 文中示例的Angular及Rxjs版本: "@angular/core": "~4.0.0", "rxjs": "5.0.1" 2.Rxjs简单示例import {Observable}from'rxjs/Rx';//引入可观察对象 public testRx1(){ //1.产生一个可观察对象,包括即时发送和延时发送 letmyobservable=Observable.create((observer:any)=>{ observer.next(1);//发送数字1 observer.next(2); observer.next(3); setTimeout(()=>{observer.next(4);},1000);//1秒后发送数字4 }); //2.创建一个观察者仅包含next操作 letmyobserver= {next:(x:any)=>console.log('receive data:',x)}; console.log('before subscribe'); //3.对可观察对象进行订阅 letmysubscripton=myobservable.subscribe(myobserver); //4.可以取消订阅 //mysubscripton.unsubscribe(); console.log('after subscribe'); } 运行得到结果 3.Observable可观察对象 可观察对象是一串消息或事件的集合,可以发送多值。在“Rxjs简单示例“中可以看到,使用Observale.create创造了一个可观察对象myobservable,在里面首先发送1,2,3三个值,间隔1s后发送数字4。但是创造一个可观察对象的方法是多种多样的。可以通过操作符创建和事件流产生,如下例所示: public createObservable(){ let myobserver= {next:(x:any)=>console.log(x)}; //通过操作符产生observable let myobservable1=Observable.from([1,2,3,4]).subscribe(myobserver);//运行结果:1,2,3 let myobservable2=Observable.of('a','b','c').subscribe(myobserver);//运行结果:a,b,c let myobservable3=Observable.interval(1000).subscribe(myobserver);//运行结果:0,1,2,3,4,5,6,...... //通过事件流产生observable let button=document.getElementById('mybtn'); let myobservable=Observable.fromEvent(button,'click') .throttleTime(1000)//间隔一秒才允许响应click事件 .scan((count:number)=>count+1,0)//初始值为0开始累加 .subscribe((count:number)=>console.log(`clicked${count} times`));//ES6模板字符串 //疯狂点击按钮,运行结果: //clicked 1 times //---间隔1秒--- //clicked 2 times //---间隔1秒--- //clicked 3 times ...... } 4.Obsever观察者 所谓观察者,实质上是一组回调函数,当它订阅可观察对象时,提供三种类型回调函数(next/error/complete)。在“Rxjs简单示例“中可以看到,最简化情况下,仅仅提供一个next类型回调就可以。完整的情况如下例所示:
public createObserver(){ let myobserver= {next:(x:any)=>console.log(x), error:(error:any)=>console.log('something wrong:',error), complete:()=>console.log('finished') }; let myobservable1=Observable.from([1,2,3,4]).subscribe(myobserver);//运行结果:1,2,3,4,finished } 也可以将三个回调函数分别作为参数传递给subscribe函数,如果只提供一个函数,则默认为next函数,如下例所示:
let myobservable1=Observable.from([1,2,3,4]) .subscribe( (x:any)=>console.log(x), (error:any)=>console.log('error:',error), ()=>console.log('finished') ); //运行结果:1,2,3,4,finished 仔细观察“Rxjs简单示例”可以发现使用Observale.create创造可观察对象时,里面有个参数observer,该observer就是一个观察者,拥有next方法。 5.Subject主题主题既是可观察对象,可以被订阅,例如:Subject.subscribe({next:(x)=>……})。 主题又是观察者对象,拥有next()/error()/complete()方法,调用Subject.next(1),则数据1将会被多播至Subject的观察者们。同时Subject可以观察其他的可观察对象,如下图所示: import {Subject}from'rxjs/Rx'; public CreateSub(){ let subject=newSubject(); let obsrver1= {next:(x:any)=>console.log('observer1:',x)} let obsrver2= {next:(x:any)=>console.log('observer2:',x)} subject.subscribe(obsrver1); subject.subscribe(obsrver2); subject.next(1); subject.next('a'); subject.next(2); } 运行结果: Subject的扩展很丰富,例如:BehaviorSubject,ReplaySubject,AsyncSub等,后续章节再详细讲解。 6.Scheduler调度者 调度者控制着何时启动一个订阅和何时发送通知。它由三个组件构成:
1.数据结构:决定可观察对象中的任务或事件根据什么标准排列和存储。 2.执行上下文:决定何时任务被执行或事件发生。 3.虚拟时钟:可观察对象中的任务或事件将仅遵循该时钟表示的时间。 下面的例子使用observeOn操作符指定调度程序,注意与“Rxjs简单示例”的运行结果相比较
public testScheduler(){ //1.产生一个可观察对象,包括即时发送和延时发送 let myobservable=Observable.create((observer:any)=>{ observer.next(1);//发送数字1 observer.next(2); observer.next(3); setTimeout(()=>{observer.next(4);},1000);//1秒后发送数字4 }).observeOn(Scheduler.async);//调度者 //2.创建一个观察者仅包含next操作 let myobserver= {next:(x:any)=>console.log('receive data:',x)}; console.log('before subscribe'); //3.对可观察对象进行订阅 var mysubscripton=myobservable.subscribe(myobserver); //4.可以取消订阅 //mysubscripton.unsubscribe(); console.log('after subscribe'); } 本例的运行结果 对比下“Rxjs简单示例”的运行结果 为何时序完全不同呢?前面提到过Observable.create里面有一个观察者,调度者通过observeOn对该观察者进行了调度, 让消息延时发送,才出现数据在“after subscribe“之后才被next出来的现象。 RxJS提供的内置调度器有三种,示例中使用的async调度器是其中之一。可能平时感觉没有使用到调度者,其实所有的操作 符处理并发时都可以选择调度器,例如 from([10, 20, 30], Scheduler.async)。如果不明确指明调度器则系统自动选择最合适的。
| 调度器 | 目的 |
|---|
null | 不传递任何调度器的话,会以同步递归的方式发送通知。用于定时操作或尾递归操作。 | Rx.Scheduler.queue | 当前事件帧中的队列调度(蹦床调度器)。用于迭代操作。 | Rx.Scheduler.asap | 微任务的队列调度,它使用可用的最快速的传输机制,比如 Node.js 的 process.nextTick() 或 Web Worker 的 MessageChannel 或 setTimeout 或其他。用于异步转换。 | Rx.Scheduler.async | 使用 setInterval 的调度。用于基于时间的操作符。 |
7.操作符Rxjs强大的原因,源自于它丰富的操作符。前面例子中提到的from就是操作符。操作符按调用方法不同分为静态操作符和实例操 作符。示例:
let source = Observable.interval(1000)//产生从0开始的整数序列,每个数值间隔一秒 .take(10);//允许发送十个值后停止 let subscribe = source.subscribe( (x:any)=>{this.count++;console.log('ret x:',x)} ); 
示例代码中,interval()就是典型的静态操作符,定义的Observable类上,需要通过Observable调用。输入一个非可观察对象的参数, 得到一个全新的可观察对象,假设为newObsv。代码中的take()就是典型的实例操作符,通过实例newObsv调用,产生新的可观察对 象,不且不会改变当前的newObsv。
|