Web

RXJS 学习

Posted by Kerwen Blog on July 21, 2019

简介

RxJS 是 Reactive Extensions for JavaScript 的缩写,起源于 Reactive Extensions,是一个基于可观测数据流在异步编程应用中的库。RxJS 是 Reactive Extensions 在 JavaScript 上的实现。

前置知识点

响应式编程

响应式编程(RP — Reactive Programming)是一种面向数据流和变化传播的编程范式。在编程语言中很方便地表达静态或动态的数据流,而相关的计算模型会自动将变化的值通过数据流进行传播。

  1. 响应式编程是使用异步数据流进行编程。用包含这些事件在内的任何东西创建数据流(Data stream),监听他并作出响应。
  2. 只关注业务逻辑互相依赖的事件而不是实现细节
  3. 适用于大量和数据有关的事件交互,特别是高实时性要求

一个流就是一个不间断的按照时间排序的序列。它产生三种不同类型事件: 值,错误,完成的信号。对这三个定义事件处理函数,就可以异步的捕获这些事件.

观察者模式

即发布-订阅模式. 流是被观察的,某个函数订阅流的某个事件,该函数是观察者。当流的某个事件产生了,对应的函数就会被执行。

核心知识点

Rxjs的内容可以概括为一个核心三个重点,核心就是Observable和Operators,三个重点分别是:

1
2
3
observer
Subject
schedulers

Observable

Rxjs是观察者 + 迭代器模式的结合,Observable作为被观察者,是一个值或事件的流集合。就像是一个序列,裡面的元素会随着时间推送。

1
2
3
4
5
6
7
8
9
10
var observable = Observable
    .create(function(observer) {
        observer.next('Jerry'); // RxJS 4.x 以前的版本用 onNext
        observer.next('Anna');
    })

// 订阅 observable    
observable.subscribe(function(value) {
    console.log(value);
})

通过Observable身上的create方法可以创建一个Observable,参数中的回调函数设置这个Observable将会如何传递值,然后通过subscribe订阅这个Observable。
这里值得一提的是rxjs的subscribe是同步执行的
Observable不同于观察者模式中的被观察者,他没有一份需要维护订阅者的清单,他只是一个函数。想要订阅他只需要传进回调函数observer就好。并且,Observable 可以同时处理同步和异步操作!

Operator

image 操作Observable的函数就是操作符。他会接受传入的Observable,但是会返回新的Observable。用map举例说明。

1
2
3
4
Rx.Observable.of(2)
         .map(v => v * 2) 
         .subscribe(v => console.log('output:' + v));
// output:4

创造observable类

create

1
2
3
4
5
6
7
8
9
10
11
const observable = Observable.create((observe) => {
    observe.next('value')
})
observable.subscribe({
    next:() => {
    },
    complete: () => {
    },
    error: () => {
    }
}

of
感觉of类似于一个迭代器,将参数迭代然后发出。

1
2
3
4
5
6
7
8
9
10
11
12
13
var source = of('Jerry', 'Anna');

source.subscribe({
    next: function(value) {
        console.log(value)
    },
    complete: function() {
        console.log('complete!');
    },
    error: function(error) {
        console.log(error)
    }
});

from
from的参数必须是一个类数组(set,iterator等),其他和of一样

1
2
3
4
5
6
7
8
9
10
11
12
13
14
var arr = ['Jerry', 'Anna', 2016, 2017, '30 days'] 
var source = from(arr);

source.subscribe({
    next: function(value) {
        console.log(value)
    },
    complete: function() {
        console.log('complete!');
    },
    error: function(error) {
        console.log(error)
    }
});

Empty,never和throw
empty 會給我們一個空的 observable,如果我們订阅这个 observable , 它会立即响应complete 函数。
throw,它也只做一件事就是拋出错误。

Interval和timer
interval和setInterval一样,几秒钟发送一个值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
var source = interval(1000);    //参数为设定多少毫秒钟发送一个值。

source.subscribe({
    next: function(value) {
        console.log(value)
    },
    complete: function() {
        console.log('complete!');
    },
    error: function(error) {
    console.log('Throw Error: ' + error)
    }
});
// 0
// 1
// 2
// ...

选择器类

take
有的时候我门希望获取Observable前几个数然后结束(执行complete方法)

1
2
3
4
5
6
7
8
9
10
11
12
var source = interval(1000);
var example = source.pipe(take(3));

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});
// 0
// 1
// 2
// complete

first
取第一个数然后结束,和take(1)效果一样

takeLast,last
takeLast和take用法一样,区别是该方法是取后边几个值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
var source = interval(1000).pipe(take(6), takeLast(2));

source.subscribe({
next: value => {
    console.log(value);
},
error: err => {
    console.log("Error: " + err);
},
complete: () => {
    console.log("complete");
}
});
// 4
// 5
// complete

控制数据流类

takeUntil
参数为一个Observable,当参数Observable订阅发生,终止takeUntil绑定的observable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
const click = fromEvent(document.body, "click");
const source = interval(1000).pipe(takeUntil(click));

source.subscribe({
next: value => {
    console.log(value);
},
error: err => {
    console.log("Error: " + err);
},
complete: () => {
    console.log("complete");
}
});
// 0
// 1
// 2
// 3
// complete 当点击body

skip
使用方式类似take,take是取前几个,skip的意思是跳过前几个,取后边几个。

startWith
塞一个初始值给Observable

concat
concat和concatAll效果是一样的,区别在于 concat要传递参数,参数必须是Observable类型。
concat 将多个observable串接起来前一个完成好了,再执行下一个。

delay和delayWhen
delay会将observable第一次发出订阅的时间延迟

数据操作类

map
和JavaScript中的map一样

Observer

和迭代器模式一一对应,提供三个方法,next、error、complete

1
2
3
4
5
6
7
8
9
10
11
12
13
14
var arr = ['Jerry', 'Anna', 2016, 2017, '30 days'] 
var source = from(arr);

source.subscribe({
    next: function(value) {
        console.log(value)
    },
    complete: function() {
        console.log('complete!');
    },
    error: function(error) {
        console.log(error)
    }
});

subject

RxJS 与前端框架结合

Angular 自身引用了 RxJS,如 http 和 animation 都使用了 Observable,状态管理可以使用 ngrx。

Vue 官方有与 RxJS 集成的 vue-rx。

Reference

https://juejin.im/post/597fe587518825563e037bd3
https://www.jianshu.com/p/16be96d69143

RxJS v6 学习指南

Angular + RxJs