简介
响应式编程
是一种面向数据流
和变更传播
的异步编程
范式- RxJS(Reactive Extensions for JavaScript)是一个将
响应式编程概念
引入 JavaScript 的库。 - RxJS 的核心思想是Observable,它是一个强大的数据类型,表示随着时间推移的一系列值或事件
- Observable 的核心关注点:- 创建- 订阅- 执行- 清理
基本概念
- Observable:表示一个可调用的未来值或事件的集合。
- Observer:是一组回调,知道如何监听 Observable 传递的值。
- Subscription:表示 Observable 的执行,主要用于取消执行。
- Operators:是纯函数,允许使用类似于数组方法(如 map、filter、concat、reduce 等)以函数式编程的方式处理集合。
- Subject:等效于 EventEmitter,是将值或事件广播给多个 Observers 的唯一方式。
- Schedulers:是集中的调度程序,用于控制并发,允许我们协调计算的发生时间,例如 setTimeout 或 requestAnimationFrame 等。
Observable
js
loadData() {
this.ob = new Observable(data => {
let i = 0
let timer = setInterval(() => {
console.log(new Date())
i++
if (i > 5) {
clearInterval(timer)
data.complete()
}
data.next(new Date())
}, 1000)
console.log('开始执行', data)
})
let observers = {
next: val => {
this.obMsg = val
},
complete: () => {
console.log('消息没有了')
},
}
this.sub = this.ob.subscribe(observers)
}
unsub() {
this.sub.unsubscribe()
}
Subject
用于创建 空的 可观察对象,在订阅后不会立即执行,next 方法可以在可观察对象外部调用
js
import { Subject } from 'rxjs'
// 空观察对象
const demoSubject = new Subject()
// 不会执行
demoSubject.subscribe({
next: function (value) {
console.log(value)
},
})
demoSubject.subscribe({
next: function (value) {
console.log(value)
},
})
// 3s 会执行
setTimeout(function () {
demoSubject.next('hahaha')
}, 3000)
BehaviorSubject
拥有 Subject 全部功能,但是在创建 Obervable 对象时可以传入默认值,订阅后立即执行,可以直接拿到默认值。
js
const demoBehavior = new BehaviorSubject('默认值')
// 第一次能收到默认值
demoBehavior.subscribe({
next: function (value) {
console.log(value)
},
})
demoBehavior.next('Hello')
ReplaySubject
功能类似 Subject,但有新订阅者时两者处理方式不同,Subject 不会广播历史结果,而 ReplaySubject 会广播所有历史结果。
js
replaySub() {
const rSubject = new ReplaySubject()
rSubject.subscribe(value => {
console.log(value)
})
rSubject.next('Hello 1')
rSubject.next('Hello 2')
setTimeout(function () {
rSubject.subscribe({
next: function (value) {
console.log(value) //会收到钱两次的结果
},
})
}, 3000)
}
与 Promise 的不同之处:
ObServable | Promise |
---|---|
惰性执行,只有被 subscribe 才会执行 | Promise 创建就会执行 |
可观察对象能提供多个值 | Promise 只提供一个 |
可以进行多次订阅,每次订阅都会重新开始执行一遍 | 只执行一次,但可以链式调用 |
内部状态能够变更 | 决定后不能变更 |
可以取消订阅 | 不能取消 |
rxjs 基本示例:
注册事件监听器(普通 JavaScript):
javascriptdocument.addEventListener('click', () => console.log('Clicked!'))
使用 RxJS 创建 Observable:
javascriptimport { fromEvent } from 'rxjs' fromEvent(document, 'click').subscribe(() => console.log('Clicked!'))
纯函数处理状态:
javascript// 普通 JavaScript let count = 0 document.addEventListener('click', () => console.log(`Clicked ${++count} times`)) // 使用 RxJS 隔离状态 import { fromEvent, scan } from 'rxjs' fromEvent(document, 'click') .pipe(scan(count => count + 1, 0)) .subscribe(count => console.log(`Clicked ${count} times`))
控制事件流:
javascript// 普通 JavaScript let count = 0 let rate = 1000 let lastClick = Date.now() - rate document.addEventListener('click', () => { if (Date.now() - lastClick >= rate) { console.log(`Clicked ${++count} times`) lastClick = Date.now() } }) // 使用 RxJS import { fromEvent, throttleTime, scan } from 'rxjs' fromEvent(document, 'click') .pipe( throttleTime(1000), scan(count => count + 1, 0) ) .subscribe(count => console.log(`Clicked ${count} times`))