Skip to content

简介

  • 响应式编程是一种面向数据流变更传播异步编程范式
  • RxJS(Reactive Extensions for JavaScript)是一个将响应式编程概念引入 JavaScript 的库。
  • RxJS 的核心思想是Observable,它是一个强大的数据类型,表示随着时间推移的一系列值或事件
  • Observable 的核心关注点:- 创建- 订阅- 执行- 清理

基本概念

  1. Observable:表示一个可调用的未来值或事件的集合。
  2. Observer:是一组回调,知道如何监听 Observable 传递的值。
  3. Subscription:表示 Observable 的执行,主要用于取消执行。
  4. Operators:是纯函数,允许使用类似于数组方法(如 map、filter、concat、reduce 等)以函数式编程的方式处理集合。
  5. Subject:等效于 EventEmitter,是将值或事件广播给多个 Observers 的唯一方式。
  6. Schedulers:是集中的调度程序,用于控制并发,允许我们协调计算的发生时间,例如 setTimeout 或 requestAnimationFrame 等。

Observable

js

loadData() {
  this.ob = new Observable(data => {
    let i = 0
    let timer = setInterval(() => {
      console.log(new Date())
      i++
      if (i > 5) {
        clearInterval(timer)
        data.complete()
      }
      data.next(new Date())
    }, 1000)
    console.log('开始执行', data)
  })

  let observers = {
    next: val => {
      this.obMsg = val
    },
    complete: () => {
      console.log('消息没有了')
    },
  }
  this.sub = this.ob.subscribe(observers)
}
unsub() {
  this.sub.unsubscribe()
}

Subject

用于创建 空的 可观察对象,在订阅后不会立即执行,next 方法可以在可观察对象外部调用

js
import { Subject } from 'rxjs'
// 空观察对象
const demoSubject = new Subject()
// 不会执行
demoSubject.subscribe({
  next: function (value) {
    console.log(value)
  },
})
demoSubject.subscribe({
  next: function (value) {
    console.log(value)
  },
})
// 3s 会执行
setTimeout(function () {
  demoSubject.next('hahaha')
}, 3000)

BehaviorSubject

拥有 Subject 全部功能,但是在创建 Obervable 对象时可以传入默认值,订阅后立即执行,可以直接拿到默认值。

js
const demoBehavior = new BehaviorSubject('默认值')
// 第一次能收到默认值
demoBehavior.subscribe({
  next: function (value) {
    console.log(value)
  },
})
demoBehavior.next('Hello')

ReplaySubject

功能类似 Subject,但有新订阅者时两者处理方式不同,Subject 不会广播历史结果,而 ReplaySubject 会广播所有历史结果。

js
replaySub() {
  const rSubject = new ReplaySubject()
  rSubject.subscribe(value => {
    console.log(value)
  })
  rSubject.next('Hello 1')
  rSubject.next('Hello 2')
  setTimeout(function () {
    rSubject.subscribe({
      next: function (value) {
      console.log(value) //会收到钱两次的结果
      },
    })
  }, 3000)
}

与 Promise 的不同之处:

ObServablePromise
惰性执行,只有被 subscribe 才会执行Promise 创建就会执行
可观察对象能提供多个值Promise 只提供一个
可以进行多次订阅,每次订阅都会重新开始执行一遍只执行一次,但可以链式调用
内部状态能够变更决定后不能变更
可以取消订阅不能取消

rxjs 基本示例:

  1. 注册事件监听器(普通 JavaScript):

    javascript
    document.addEventListener('click', () => console.log('Clicked!'))

    使用 RxJS 创建 Observable

    javascript
    import { fromEvent } from 'rxjs'
    
    fromEvent(document, 'click').subscribe(() => console.log('Clicked!'))
  2. 纯函数处理状态

    javascript
    // 普通 JavaScript
    let count = 0
    document.addEventListener('click', () => console.log(`Clicked ${++count} times`))
    
    // 使用 RxJS 隔离状态
    import { fromEvent, scan } from 'rxjs'
    
    fromEvent(document, 'click')
      .pipe(scan(count => count + 1, 0))
      .subscribe(count => console.log(`Clicked ${count} times`))
  3. 控制事件流

    javascript
    // 普通 JavaScript
    let count = 0
    let rate = 1000
    let lastClick = Date.now() - rate
    
    document.addEventListener('click', () => {
      if (Date.now() - lastClick >= rate) {
        console.log(`Clicked ${++count} times`)
        lastClick = Date.now()
      }
    })
    
    // 使用 RxJS
    import { fromEvent, throttleTime, scan } from 'rxjs'
    
    fromEvent(document, 'click')
      .pipe(
        throttleTime(1000),
        scan(count => count + 1, 0)
      )
      .subscribe(count => console.log(`Clicked ${count} times`))