RxJS 入门及应用

原帖

RxJS 是 函数响应式编程的

前置知识点

1. 函数式编程

函数式编程其核心是: Purity 纯函数

  • 函数的执行过程完全由输入参数决定,不会受除参数之外的任何数据的影响。
  • 函数不会修改任何外部状态,比如修改全局变量或传入的参数对象。

2. 响应式编程(Reactive Programming)

wiki 百科中的解释:

在计算中,响应式编程反应式编程(Reactive programming)是一种面向数据流和变化传播的声明式编程范式。这意味着可以在编程语言中很方便地表达静态或动态的数据流,而相关的计算模型会自动将变化的值通过数据流进行传播。

什么是数据流?

数据流(data stream)是数据在系统内传播的路径,表示在一定时间范围内发生的一系列事件。

任何东西都可以是一个** Stream:变量、用户输入、网络响应、定时器、数据结构 **等等。

什么是变化传播?

在数据流传播的过程中,可能会有一些事件去组合、创建、过滤这些 Streams,从一个旧的 stream 映射成一个新的 stream。我们不需要去轮询变化,而是对事件进行监听,在执行一个事件后,会自动做出相应的响应,这就是变化传播。

3. 前端框架与 rxjs 的结合

  • 前端框架的职责(例如 react):数据与 UI 视图的同步,数据发生更新时,视图随之更新;
1
UI = f(data);
  • 响应式编程的职责(例如 rxjs):聚焦于数据,从数据的源头开始,到数据的处理变化,再到数据流的订阅,数据的消费;
1
data = g(origin data)
  • 两者关系看起并不冲突,并且在某些场景下结合使用可能会为我们带来便捷,前端框架可以作为响应式编程数据的一个消费者;
1
UI = f(g(origin data))

RxJS 是用来干什么的?

RxJS 是一个用于处理异步事件流的库,通过使用 observable 序列来编写异步和基于事件的程序,实际应用场景就是把请求封装成 observerable,通过一些基本操作符(map、filter 等等)将返回的数据处理并且 catch 错误,将异步事件作为集合来处理。RxJS 实际上是将开发过程中遇到的异步(多为异步,同步也可以)操作看为一个事件流,RxJS 内部封装了对一个事件流的操作符(创建、转换、组合、过滤、错误异常处理等),组合使用这些操作符来以更便利的方式来管理事件。

为什么用 RxJS,摘自知乎回答:

思考一下,异步的本质是什么?

异步操作和同步操作最大的区别就是异步有时序。

我们可以把同步操作理解为:数据+函数

那么异步操作就是:数据+函数+时序

Rx 就是把时序抽离成一根时间轴,在这根时间轴上进行同步操作,而异步相关的时序处理就交给 Rx 提供的各种 operator 操作符。

所以问题就很简单了,如果你的应用是一个时序密集的应用,那么使用 Rx 能帮你理清复杂的异步逻辑。反之,如果异步操作之间没有太多的联系,时序分散, 则不那么需要使用 Rx。

RxJS 中解决异步事件管理的基本概念

直接来例子。

  • observable 就像一个 function 函数

**Function**

1
2
3
4
5
6
7
8
9
10
function foo() {
console.log("Hello");
return "world";
}

const x = foo();
console.log(x);

const y = foo();
console.log(y);

**Observable**

1
2
3
4
5
6
7
8
9
10
11
const foo = Observable.create(function (observer) {
console.log("Hello");
observer.next("world");
});
// .subscribe()类似于调用函数
foo.subscribe(function (x) {
console.log(x);
});
foo.subscribe(function (y) {
console.log(y);
});
1
2
3
4
5
// 控制台输出是相同的:
"Hello";
"world";
"Hello";
"world";

  • Observable 和 function 的区别是什么?

Observable 可以随着时间推移返回(推送)多个值 ,这一点是函数做不到的。

**Function**

1
2
3
4
5
6
7
8
9
10
function foo() {
return "Hello";
return "world"; // 永远不会执行
}

const a = foo();
console.log(a);

//控制台输出
("Hello");

**Observable**

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
const foo = Observable.create(function (observer) {
observer.next('Hello');
observer.next('world');
});

foo.subscribe(function (x) {
console.log(x);
});

// 控制台输出
'Hello'
'world'

// 也可以异步推送一些值
const foo = Observable.create(function (observer) {
observer.next('Hello');
setTimeout(() => {
observer.next('rxjs');
},0)
observer.next('world');
});

foo.subscribe(function (x) {
console.log(x);
});

// 控制台输出
'Hello'
'world'
'rxjs'

语法

1. Observable(可观察对象)

将一个数据流看作一个可观察对象,表示这个数据流变化传播过程中发生的一些列事件的集合。

单个值多个值
拉取(pull)FunctionIterator
推送(push)PromiseObservable

拉取推送是两种不同的协议,用来描述数据生产者 (Producer) 与数据消费者 (Consumer) 如何通信。

  1. 拉取体系

js 中每个函数 function 都属于拉取体系,函数来生产数据,消费者通过调用该函数的代码来从函数中获取单个返回值来对该函数进行消费,而迭代器 Iterator 则是消费者调用 iterator.next()来获取多个返回值进行消费。

拉取的过程中,生产者是一个被动的过程,在消费者请求调用自己时才产生数据,消费者是一个主动的过程,消费者自己来决定何时调用生产者来获取收据。

  1. 推送体系

在如今的 js 中,Promise 是最常见的推送体系,Promise 作为生产者,将解析过的 resolved 值传给消费者注册过的一个回调函数。

推送的过程中,生产者是一个主动的过程,在生产者获取 resolved 值的时候,生产者可以决定何时把值推送给消费者,而消费者并不知道什么时候可以从生产者这里获取到值。在 RxJS 中,observable 也属于推送体系,并且可以推送一个或多个值。

1.1 创建 Observable

Observable 可以使用Observable.create来创建,但通常我们使用创建操作符来创建 Observable。

1.2 订阅 Observable

订阅 Observable 像是调用函数,并提供接收数据的回调函数。

1
2
3
observable.subscribe((value) => {
// do something
});

不同观察者通过 subscribe 调用同一 observable 数据不共享。

每一次调用,等于重新执行一遍函数。

1.3 执行 Observable

Observable 执行可以传递三种类型的值:

  1. Next:推送一个值,可以是任意类型;
  2. Error:推送一个错误或者异常;
  3. Complete:推送一个「已完成」的消息,表明不会再发送任何值;

next() 方法中的值代表要推送给观察者的实际数据,可以执行多次;

error()和 complete()会在 Observable 执行期间至多执行一次,并且只会执行其中一个

1
2
3
4
5
6
7
8
9
10
Observable.create(observer => {
try {
observer.next(1);
observer.next(2);
observer.complete();
observer.next(3); // 前面已经通知观察者已经完成,所以这个值不会发送
} catch (e) {
observer.error(e); // 捕获到异常发送一个错误
}
}

1.4 销毁 Observable 执行

Observable 的执行可能会是无限的,通常观察者希望在一个有限的时间里终止 Observable 执行,以避免浪费计算资源和内存消耗。

类似于清除定时器,var timer = setInterval(() => {},1000); clearInterval(timer);

1
2
3
4
5
// 调用subscribe时,观察者会被附加到新创建的Observable执行中,
// 会返回一个对象,即Subscription(订阅)
var subscription = observable.subscribe();
// Subscription表示正在进行中的执行,调用unsubscribe()来取消observable执行;
subscription.unsubscribe();

2. Observer (观察者)

就是 subscribe 中的回调函数。

Observer(观察者)是一组回调函数的集合,每一个回调函数对应 Observable 发送通知的类型:nexterrorcomplete

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
const observer = {
next: () => {}, // 观察者接收到next()消息执行的回调函数
error: () => {}, // 观察者接收到error()消息执行的回调函数
complete: () => {}, // 接收到complete()消息执行的回调函数
};

// observer中的观察者可能是部分的,没有提供某个回调,observable还是可以执行的。
// 方法1:将observer观察者传入subscribe
observable.subscribe(observer);

// 方法2:subscribe按顺序(next,error,complete)传入三个回调函数,公司现在用得就是这种 √
observable.subscribe(
(value) => {},
(error) => {},
() => {}
);

3. Subscription (订阅)

Subscription 是一个可清理资源的对象,代表 Observable 的执行。

基本用处就是使用 unsubscribe 来释放资源或取消 Observable 的执行。

4. Subject (主体)

引入一个新的概念,Cold Observable / Hot Observable

Observable 对象就是一个数据流,在一个时间范围内推送一系列数据。

在只存在一个 observer 的情况下很简单,但是对于存在多个 observer 的场景,会变得复杂。

假设一个场景:

两个 observer 观察者 A 和 B 订阅同一个 Observable 对象,但他们不是同时订阅,第一个观察者 A 订阅 N 秒后,第二个观察者 B 才订阅这个 Observable 对象。并且在这 N 秒期间,Observable 已经推送了一些数据,那么第二个观察者 B 应不应该收到已经被推送给第一个观察者 A 的那些数据呢?

  • 场景 1 : 已经推送给观察者 A 的值就不给 B 了,B 只从订阅那一时间点接收 Observable 推送的数据就行了。
  • 场景 2: 已经推送给观察者 A 的值还是要给 B,B 订阅时从头开始获取 Observable 推送的数据。

RxJS 考虑到这两种不同的场景,让 Observable 支持这两种不同的需求,Selection 1 这样的 Observable 就是 Hot Observable,而 Selection 2 这样的 Observable 就是 Cold Observable。

RxJS Subject 是一种特殊类型的 Observable,允许将值多播给多个观察者(每个已订阅的观察者从订阅时间点开始接收当前 Observable 推送的值,非独立),而普通的 Observable 是单播的(每个已订阅的观察者是独立执行 Observable 的)。

对于多个订阅 Subject 的观察者,subscribe 不会重新从头发送值,他只是将观察者注册到观察者列表中,后续有新值发送的时候,将值多播给观察者列表中的所有观察者。


subject 每 next 一次,subject 的消费者都会 更新!


**RxJS的四种不同类型Subject**

ObservableSubjectBehaviorSubjectAsyncSubjectReplaySubject
每次从源头开始将值推送给观察者将值多播给已订阅的该 Subject 的观察者列表把最后一个值(当前值)发送给观察者(需要一个初始值)执行的最后一个值发给观察者可以把之前错过的值发给观察者

BehaviorSubject

BS 有一个“当前值”的概念,它保存了发送给观察者的最后一个值(当前值),当有新的观察者订阅时,会立即接收到“当前值”;

而如果用 Subject,在观察者订阅时,之前已发送的值不会再发给观察者包括最近的一个值,后续再有值发送的时候,新注册的观察者才会接收到新的值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
var subject = new BehaviorSubject(0); // 0是初始值

subject.subscribe({
next: (v) => console.log("observerA: " + v),
});

subject.next(1);
subject.next(2);

subject.subscribe({
next: (v) => console.log("observerB: " + v),
});

subject.next(3);
// 输出:
observerA: 0; //line3 :A订阅时立即收到当前值(初始值)0
observerA: 1; //line7 : BS推送新的值1,订阅者A接收到值1
observerA: 2; //line8 : BS推送新的值2,订阅者A接收到值2
observerB: 2; //line 10 : B订阅时立即收到变化后的当前值2
observerA: 3; //line 14: BS推送新的值3,订阅者A和B一起收到值3
observerB: 3;

5. Operators (操作符)

操作符是允许复杂的异步代码以声明式的方式进行轻松组合的基础代码单元。

操作符本质就是一个纯函数,当操作符被调用时,不会改变已经存在的 Observable 实例,会基于当前 Observable 创建一个新的 Observable。

一个 Observable 对象代表的是一个数据流,实际场景中,产生 Observable 对象并不是每次都通过直接调用 Observable 构造函数来创造数据流对象。于现实中复杂的问题,并不会创造一个数据流之后就直接通过 subscribe 接上一个 Observer,往往需要对这个数据流做一系列处理,然后才交给 Observer。就像一个管道,数据从管道的一段流入,途径管道各个环节,当数据到达 Observer 的时候,已经被管道操作过,有的数据已经被中途过滤抛弃掉了,有的数据已经被改变了原来的形态,而且最后的数据可能来自多个数据源,最后 Observer 只需要处理能够走到终点的数据,而这个数据管道就是**pipe**


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!