Skip to content

我的 RxJS 學習筆記

Posted on:2024年9月25日 at 下午09:00
我的 RxJS 學習筆記

RxJS 適合用來處理複雜的異步操作和事件流,主要是因為以下的編程特色:

  1. 響應式編程(Reactive Programming):響應式編程透過定義資料流和自動處理的依賴關係,讓系統能夠即時回應資料或狀態的變化。
  2. 函數式編程(Functional Programming):函數式編程以不可變資料和純函數為核心,強調函數作為一等公民並避免副作用。

本文使用 TypeScript 作為示例,如果你不想使用它,可以直接忽略所有的型別宣告。

基本概念

RxJS 的學習,基本上便是要了解基本概念。

可觀察者 Observable 與觀察者 Observer

Observable 在接受訂閱後,會透過訂閱者(subscriber)發出一系列的訊息,由 Observer 決定如何處理。Observable 發出的訊息有三種:

  1. 值:如果訂閱者呼叫 next 方法,則會將值傳遞給訂閱者。
  2. 錯誤:如果訂閱者呼叫 error 方法,則會將錯誤傳遞給訂閱者。
  3. 完成:如果訂閱者呼叫 complete 方法,則會將完成傳遞給訂閱者。

可以看到,Observable 類似一個迭代器,但封裝了更豐富的方法。

因此,Observer 是一個具有三個方法的物件。基本的使用方式如下:

import { Observable, Observer } from "rxjs";

const observable = new Observable<number>(subscriber => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  setTimeout(() => {
    subscriber.next(4);
    subscriber.complete();
  }, 1000);
});

const observer: Observer<number> = {
  next(x) {
    console.log("got value " + x);
  },
  error(err) {
    console.error("something wrong occurred: " + err);
  },
  complete() {
    console.log("done");
  },
};

observable.subscribe(observer);

如果只想處理回傳值,Observable 可以透過一個回呼函數訂閱。此時,訂閱者所發出的錯誤和完成的訊息會直接被忽略:

observable.subscribe(x => {
  console.log("got value " + x);
});

如果你想的話,你也可以部分地定義 Observer:

const observer: PartialObserver<number> = {
  complete() {
    console.log("done");
  },
};

總結來說:Observable 經過訂閱後,訂閱者會按照 Observable 所被指派的方式呼叫訂閱者的方法,當訂閱者呼叫了方法,Observer 會看到發出出來的訊息,並加以處理。

這是 RxJS 的所有運作邏輯的基礎。

訂閱 Subscription

訂閱是一個用來取消已訂閱 Observer 的物件,它基本上只有 unsubscribe 這個標準方法。

const subscription = observable.subscribe(observer);
subscription.unsubscribe();

它也提供了一個可以將多個已訂閱 Observer 的訂閱取消的便利方法 add

const mainSubscription = observable1.subscribe(observer1);
const childSubscription = observable2.subscribe(observer2);
const grandChild1 = observable3.subscribe(observer3);
const grandChild2 = observable4.subscribe(observer4);

childSubscription.add(grandChild1);
childSubscription.add(grandChild2);

mainSubscription.add(childSubscription);

// 這將取消所有的 mainSubscription 和 childSubscription 和 grandChild1 和 grandChild2 的訂閱。
mainSubscription.unsubscribe();

運算子 Operator

Operator 是函式,有兩種類型:

RxJS 擁有豐富的 Operator,可以簡化我們對 Observable 的控制,並且能在多個 Observable 之間建立連結。

以下是一些範例:

建立 Operator

import { from, fromEvent } from "rxjs";

// 從 DOM 事件創建 Observable
const clickObservable = fromEvent(document, "click");
clickObservable.subscribe(event => console.log("Click event:", event));

// 從 Promise 創建 Observable
const promiseObservable = from(fetch("https://api.github.com/users/github"));
promiseObservable.subscribe(
  response => console.log("Response:", response),
  error => console.error("Error:", error)
);

轉換 Operator

import { fromEvent } from "rxjs";
import { map, pluck } from "rxjs/operators";

// 轉換滑鼠點擊事件為坐標
const clicks = fromEvent(document, "click");
const positions = clicks.pipe(
  pluck("clientX", "clientY"),
  map(([x, y]) => ({ x, y }))
);
positions.subscribe(pos => console.log("Clicked at:", pos));

過濾 Operator

import { fromEvent } from "rxjs";
import { filter, debounceTime } from "rxjs/operators";

// 過濾並延遲處理輸入事件
const input = document.getElementById("search-input");
const typeahead = fromEvent(input, "input").pipe(
  pluck("target", "value"),
  filter(text => text.length > 2),
  debounceTime(300)
);
typeahead.subscribe(value => console.log("Search for:", value));

組合 Operator

import { combineLatest, fromEvent } from "rxjs";
import { map } from "rxjs/operators";

// 組合多個輸入框的值
const weight = fromEvent(document.getElementById("weight"), "input").pipe(
  pluck("target", "value")
);
const height = fromEvent(document.getElementById("height"), "input").pipe(
  pluck("target", "value")
);

const bmi = combineLatest([weight, height]).pipe(
  map(([w, h]) => {
    const weightKg = parseFloat(w);
    const heightM = parseFloat(h) / 100;
    return (weightKg / (heightM * heightM)).toFixed(2);
  })
);

bmi.subscribe(value => console.log("BMI:", value));

錯誤處理 Operator

import { from } from "rxjs";
import { map, catchError, retry } from "rxjs/operators";

// 模擬 API 調用並處理錯誤
const apiCall = (userId: string) => {
  return from(fetch(`https://api.example.com/users/${userId}`)).pipe(
    map(response => {
      if (!response.ok) throw new Error("HTTP error " + response.status);
      return response.json();
    }),
    retry(3),
    catchError(error => {
      console.error("API call failed:", error);
      return from([]); // 返回空數組作為後備
    })
  );
};

apiCall("123").subscribe(
  data => console.log("User data:", data),
  error => console.error("Error:", error),
  () => console.log("API call completed")
);

實用工具 Operator

import { fromEvent } from "rxjs";
import {
  debounceTime,
  distinctUntilChanged,
  throttleTime,
} from "rxjs/operators";

// 使用 debounce 處理搜索輸入
const searchInput = document.getElementById("search");
const searchTerms = fromEvent(searchInput, "input").pipe(
  debounceTime(300),
  pluck("target", "value"),
  distinctUntilChanged()
);
searchTerms.subscribe(term => console.log("Search term:", term));

// 使用 throttle 處理滾動事件
const scrollEvents = fromEvent(window, "scroll").pipe(throttleTime(200));
scrollEvents.subscribe(() => console.log("Scroll event"));

此外,你可以引入 pipe 函式,來將 Operator 進行結合來定義 Operator 以重複使用:

import {
  pipe,
  fromEvent,
  debounceTime,
  map,
  filter,
  distinctUntilChanged,
} from "rxjs";

const handleSearch = pipe(
  map((event: Event) => (event.target as HTMLInputElement).value),
  map(term => term.trim()),
  filter(term => term.length > 2),
  debounceTime(300),
  distinctUntilChanged()
);

const searchInput = document.getElementById("search-input") as HTMLInputElement;
const search = fromEvent(searchInput, "input").pipe(handleSearch);

search.subscribe(searchTerm => {
  console.log(`Searching for: ${searchTerm}`);
});

我將所有 Operator(不包含將被棄用的)的簡介和所需的參數的簡易描述放在附錄中,實際的使用方式請參考 官方文件

高階 Observable

若 Observable 被連入通道,該 Observable 將會在事件流中回傳 Observable,這通常不是我們需要的行為。這些未被訂閱的 Observable 無法被觀察,這時我們需要將 Observable 進行展平(flatten)處理。

當 Observable 連入通道,這個通道所屬的 Observable 被稱作高階 Observable(higher-order observer),相當於 Observable 的 Observable。

以下是一個簡單的示例,裡面的 mergeAllswitchMapconcatMap 是用來進行展平處理的 Operator:

import { from, interval } from "rxjs";
import { map, take, mergeAll, switchMap, concatMap } from "rxjs/operators";

const higherOrder = from([1, 2, 3]).pipe(
  map(x =>
    interval(1000).pipe(
      take(2),
      map(y => x + ":" + y)
    )
  )
);

console.log("Using mergeAll:");
higherOrder.pipe(mergeAll()).subscribe(console.log);
// 輸出: 1:0, 2:0, 3:0, 1:1, 2:1, 3:1 (不保證順序)

console.log("Using switchAll:");
higherOrder.pipe(switchMap(obs => obs)).subscribe(console.log);
// 輸出: 3:0, 3:1

console.log("Using concatAll:");
higherOrder.pipe(concatMap(obs => obs)).subscribe(console.log);
// 輸出: 1:0, 1:1, 2:0, 2:1, 3:0, 3:1 (保證順序)

主體 Subject 與多播

Subject 是 Observable 的擴充型別,可以使用 Observable 的所有方法。對於 Observer 來說,Subject 完全就像是一個 Observable。因此它可以接受訂閱。但當它進行訂閱,它不會立刻發出任何值,而只是註冊 Observer。

Subject 同時是 Observer 的擴充型別。這有兩個意義:

  1. Subject 有 next(v) error(e)complete() 等方法可以進行呼叫。一旦呼叫,會多播到所有已經註冊的 Observer。
  2. 當在 Subject 上註冊了一個清單的 Observer,Observable 能透過訂閱 Subject 來訂閱它們,因此實現了多播。

以下是一個基本的範例,顯示了如何使用 Subject 來進行多播的情境:

import { Subject, from } from "rxjs";

const subject = new Subject<number>();

// Subject 作為 Observable,在上面註冊了兩個 PartialObserver
subject.subscribe({
  next: v => console.log(`observerA: ${v}`),
});
subject.subscribe({
  next: v => console.log(`observerB: ${v}`),
});

const observable = from([1, 2, 3]);

// Subject 作為 Observer,接收來自 Observable 的值
observable.subscribe(subject);

除了 Subject 以外,Subject 還有這些變體:

Subject 類型初始值重播行為對後來訂閱者的發送使用場景
Subject只有新值簡單的多播情況
BehaviorSubject最後一個值最後一個值 + 新值表示「當前值」
ReplaySubject指定數量的值緩衝的值 + 新值緩存和重播事件
AsyncSubject最後一個值(在完成後)只有最後一個值(在完成後)表示異步操作

Subject:簡單的多播情況

import { Subject } from "rxjs";
const subject = new Subject<number>();
subject.subscribe(v => console.log(`ObserverA: ${v}`));
subject.next(1);
subject.next(2);
subject.subscribe(v => console.log(`ObserverB: ${v}`));
subject.next(3);

// 輸出:
// ObserverA: 1
// ObserverA: 2
// ObserverA: 3
// ObserverB: 3

BehaviorSubject:表示「當前值」

import { BehaviorSubject } from "rxjs";
const temperatureSubject = new BehaviorSubject<number>(25);
temperatureSubject.subscribe(v => console.log(`TemperatureDisplayA: ${v}`));
temperatureSubject.next(26);
temperatureSubject.subscribe(v => console.log(`TemperatureDisplayB: ${v}`));
temperatureSubject.next(27);

// 輸出:
// TemperatureDisplayA: 25 (訂閱時,先發出當前值)
// TemperatureDisplayA: 26 (新值)
// TemperatureDisplayB: 26 (訂閱時,先發出當前值)
// TemperatureDisplayA: 27 (新值)
// TemperatureDisplayB: 27 (新值)

ReplaySubject:緩存和重播事件

import { ReplaySubject } from "rxjs";
const searchHistorySubject = new ReplaySubject<string>(3);
searchHistorySubject.next("React");
searchHistorySubject.next("Angular");
searchHistorySubject.next("Vue");
searchHistorySubject.next("Svelte");
console.log("Recent search history:");
const subscription = searchHistorySubject.subscribe(v => console.log(v));

// 輸出:
// Recent search history:
// Angular
// Vue
// Svelte(訂閱後,重播過去的 3 個值)

AsyncSubject:表示異步操作

import { AsyncSubject } from "rxjs";
const asyncSubject = new AsyncSubject<number>();
asyncSubject.subscribe(v => console.log(`FinalResultA: ${v}`));
asyncSubject.next(1);
asyncSubject.next(2);
asyncSubject.next(3);
asyncSubject.subscribe(v => console.log(`FinalResultB: ${v}`));
asyncSubject.next(4);
asyncSubject.complete();

// 輸出:
// FinalResultA: 4
// FinalResultB: 4

排程器 Scheduler

為了更精細控制 Observable 的發出時間,可以在創建 Observable 時使用排程(建立 Operator 基本上都接受排程器作為參數)。否則,你也可以用 observeOn Operator 來指定自定義的 Observable 的排程。

要理解排程器,你必須對 js 的事件循環(event loop)有基本的理解。

排程器共有五種,以下是它們的整理:

排程器排程說明用途
null通知以同步和遞迴方式傳遞。用於恆定時間操作或尾遞迴操作。
queueScheduler在當前事件框架(蹦床排程器 Trampoline Scheduler)中的佇列上排程。用於迭代操作。
asapScheduler在微任務佇列上排程。用於非同步轉換。
asyncScheduler使用 setInterval 的排程器。用於基於時間的操作。
animationFrameScheduler排程在下一次瀏覽器內容重繪之前。用於建立流暢的瀏覽器動畫。

附錄:Operator 參考

建立 Operator

  1. ajax: 發送 HTTP 請求並返回響應的 Observable。參數:URL 字符串或請求配置對象。
  2. bindCallback: 將回呼 API 轉換為返回 Observable 的函數。參數:要轉換的回呼函數。
  3. bindNodeCallback: 將 Node.js 風格的回呼 API 轉換為返回 Observable 的函數。參數:要轉換的 Node.js 風格的回呼函數。
  4. defer: 創建一個 Observable,只有在被訂閱時才執行指定的 Observable 工廠函數。參數:返回 Observable 或 Promise 的工廠函數。
  5. from: 將陣列、類陣列對象、Promise 或可迭代對象轉換為 Observable。參數:要轉換的輸入值。
  6. fromEvent: 將事件轉換為 Observable。參數:事件目標和事件名稱。
  7. fromEventPattern: 使用自定義的添加 / 刪除處理程序函數創建 Observable。參數:添加處理程序函數和可選的刪除處理程序函數。
  8. generate: 生成由循環條件定義的值序列的 Observable。參數:初始狀態、條件函數、迭代函數和可選的結果選擇器函數。
  9. interval: 創建一個定期發出遞增數字的 Observable。參數:時間間隔(毫秒)。
  10. range: 創建一個發出指定範圍內的數字序列的 Observable。參數:起始值和計數(可選的)。
  11. throwError: 創建一個立即向 Observer 發出錯誤的 Observable。參數:錯誤對象或返回錯誤的工廠函數。
  12. timer: 在指定的延遲後發出數字,然後可選地以固定間隔發出更多數字。參數:初始延遲和可選的間隔時間。
  13. iif: 根據條件選擇兩個 Observable 之一。參數:條件函數、真值 Observable 和假值 Observable。

聯結建立 Operator

  1. combineLatest: 透過輸入的多個 Observable 建立 Observable,它會在當任何輸入 Observable 發出值時,發出每個輸入 Observable 的最新值的陣列。
  2. concat: 按順序訂閱 Observable,並在前一個完成後才繼續下一個。
  3. forkJoin: 當所有 Observable(以陣列或字典的形式)完成時,發出以每個 Observable 的最後一個發出值形成的陣列或字典。
  4. merge: 單純地將多個 Observable 扁平化為單個 Observable。
  5. partition: 將一個 Observable 分成包含兩個 Observable 的二元數組,[0] 是滿足給定的判斷條件的,[1] 是不滿足的。
  6. race: 只發出首先發出值的 Observable 的值。
  7. zip: 每次當所有 Observable 發出值時,將多個 Observable 的值組合成陣列並發出。

轉換 Operator

  1. buffer: 緩衝來源 Observable 的值,直到關閉通知者(closingNotifier,作為參數的 Observable)發出值。
  2. bufferCount: 緩衝來源 Observable 的值,直到達到給定的緩衝區大小。
  3. bufferTime: 在指定的時間週期內緩衝來源 Observable 的值。
  4. bufferToggle: 在開啟 Observable 發出時開始緩衝,並在關閉 Observable 發出時結束緩衝。
  5. bufferWhen: 緩衝來源 Observable 的值,直到關閉 Observable 發出。
  6. concatMap: 將每個來源值投影到一個內部 Observable,該 Observable 會被合併到輸出 Observable 中,並按順序等待每個 Observable 完成後再處理下一個。
  7. concatMapTo: 類似於 concatMap,但總是將每個來源值映射到同一個內部 Observable。
  8. exhaust: 僅從最近投影的內部 Observable 中發出值,丟棄其他 Observable 直到當前 Observable 完成。
  9. exhaustMap: 將每個來來源值投影到一個內部 Observable,僅從最近投影的內部 Observable 中發出值。
  10. expand: 遞歸地將每個來源值投影到一個 Observable。
  11. groupBy: 根據指定的鍵將來源 Observable 的值分組到多個 Observable 中。
  12. map: 將給定的投影函數應用於來源 Observable 發出的每個值。
  13. mapTo: 將來源 Observable 發出的每個值映射為一個常量值。
  14. mergeMap: 將每個來源值投影到一個 Observable,該 Observable 會被合併到輸出 Observable 中。
  15. mergeMapTo: 類似於 mergeMap,但總是將每個來源值映射到同一個內部 Observable。
  16. mergeScan: 對來源 Observable 的每個值應用累加器函數,其中累加器函數本身返回一個 Observable。
  17. pairwise: 將來源 Observable 發出的值配對為陣列。
  18. scan: 對來源 Observable 的值應用累加器函數,並發出每個中間結果。類似 reduce,除了 reduce 並不發出中間結果。
  19. switchScan: 類似於 scan,但對累加器的每個調用都會切換到一個新的內部 Observable。
  20. switchMap: 將每個來源值投影到一個 Observable,該 Observable 會被合併到輸出 Observable 中,每次發出時都會取消訂閱前一個內部 Observable。
  21. switchMapTo: 類似於 switchMap,但總是將每個來源值映射到同一個內部 Observable。
  22. window: 將來源 Observable 的值分支到嵌套的 Observable 中。
  23. windowCount: 將來源 Observable 的值分支到嵌套的 Observable 中,每個嵌套的 Observable 發出固定數量的值。
  24. windowTime: 將來源 Observable 的值分支到嵌套的 Observable 中,每個嵌套的 Observable 在固定的時間週期內發出值。
  25. windowToggle: 當開啟 Observable 發出值時開始收集來源 Observable 的值到一個嵌套的 Observable 中,並在相應的關閉 Observable 發出時關閉當前的嵌套 Observable。
  26. windowWhen: 使用關閉 Observable 工廠來決定何時關閉、發出和重置窗口。

過濾 Operator

  1. audit: 在由另一個 Observable 決定的靜默期後,發出來源 Observable 的最新值。
  2. auditTime: 在指定的時間週期內忽略來源值,然後發出該週期內的最後一個值。
  3. debounce: 只有當另一個 Observable 發出值後的一段時間內沒有其他來源值時,才發出一個值。
  4. debounceTime: 只有當經過指定時間後來源 Observable 沒有發出任何其他值時,才發出一個值。
  5. distinct: 發出來源 Observable 發出的不同值。
  6. distinctUntilChanged: 僅當當前值與之前的值不同時才發出。
  7. distinctUntilKeyChanged: 僅當指定鍵的當前值與前一個值不同時才發出。
  8. elementAt: 發出來源 Observable 的第 N 個值。
  9. filter: 僅發出滿足指定判斷條件的值。
  10. first: 發出來源 Observable 的第一個值或滿足條件的第一個值。
  11. ignoreElements: 忽略來源 Observable 發出的所有項目,只傳遞完成或錯誤通知。
  12. last: 發出來源 Observable 的最後一個值。
  13. sample: 在另一個 Observable 發出時從來源 Observable 中取樣。
  14. sampleTime: 以固定的時間間隔對來源 Observable 進行採樣。
  15. single: 發出滿足判斷條件的單個項目。如果沒有或有多個項目滿足條件,則拋出錯誤。
  16. skip: 跳過來源 Observable 發出的前 N 個值。
  17. skipLast: 跳過來源 Observable 發出的最後 N 個值。
  18. skipUntil: 跳過來源 Observable 發出的值,直到另一個 Observable 發出值。
  19. skipWhile: 跳過來源 Observable 發出的值,直到指定的條件變為 false。
  20. take: 僅發出來源 Observable 的前 N 個值。
  21. takeLast: 僅發出來源 Observable 的最後 N 個值。
  22. takeUntil: 發出來源 Observable 的值,直到另一個 Observable 發出值。
  23. takeWhile: 發出來源 Observable 的值,直到指定的條件變為 false。
  24. throttle: 在由另一個 Observable 決定的靜默期間發出來源值。
  25. throttleTime: 以固定的時間間隔發出來源值。

聯結 Operator

  1. combineLatestAll: 將高階 Observable 轉換為一階 Observable,該 Observable 發出由所有內部 Observable 的最新值組成的陣列。
  2. concatAll: 將高階 Observable 轉換為一階 Observable,該 Observable 按順序連接內部 Observable 的值。
  3. exhaustAll: 將高階 Observable 轉換為一階 Observable,該 Observable 僅從最近的內部 Observable 中發出值。
  4. mergeAll: 將高階 Observable 轉換為一階 Observable,該 Observable 同時發出所有內部 Observable 的值。
  5. switchAll: 將高階 Observable 轉換為一階 Observable,該 Observable 在新的內部 Observable 到達時切換到該 Observable。
  6. startWith: 在來源 Observable 開始發出其值之前,先發出給定的值。
  7. withLatestFrom: 將來源 Observable 的值與其他 Observable 的最新值合併。

多播 Operator

  1. share: 返回一個新的 Observable,該 Observable 多播(共享)原始 Observable。

錯誤處理 Operator

  1. catchError: 捕獲來源 Observable 中的錯誤,並返回一個新的 Observable 或拋出錯誤。
  2. retry: 如果來源 Observable 發生錯誤,重新訂閱它一定次數。
  3. retryWhen: 當來源 Observable 發生錯誤時,返回一個 Observable 來決定是否應該重試。

實用工具 Operator

  1. tap: 對來源 Observable 的值執行副作用,但不修改它們。
  2. delay: 將來源 Observable 發出的值延遲指定的時間。
  3. delayWhen: 延遲來源 Observable 發出的值,延遲時間由另一個 Observable 決定。
  4. dematerialize: 將一個發出通知對象的 Observable 轉換為這些通知所代表的值的 Observable。
  5. materialize: 將來源 Observable 轉換為其通知的 Observable。
  6. observeOn: 指定 Observable 在哪個排程器上發出通知。
  7. subscribeOn: 指定 Observable 在哪個排程器上進行訂閱。
  8. timeInterval: 將來源 Observable 轉換為發出兩次連續發射之間的經過時間的 Observable。
  9. timestamp: 將每個發出的值附加一個時間戳。
  10. timeout: 如果在指定的時間週期內沒有值發出,則拋出錯誤。
  11. timeoutWith: 如果在指定的時間週期內沒有值發出,則切換到備用 Observable。
  12. toArray: 收集來源 Observable 的所有值,並在源完成時將它們作為陣列發出。

條件和布林 Operator

  1. defaultIfEmpty: 如果來源 Observable 在完成前沒有發出任何值,則發出給定的默認值。
  2. every: 判斷來源 Observable 發出的所有值是否都滿足指定的條件。
  3. find: 發出來源 Observable 中第一個滿足條件的值。
  4. findIndex: 發出來源 Observable 中第一個滿足條件的值的索引。
  5. isEmpty: 如果來源 Observable 在完成前沒有發出任何值,則發出 true,否則發出 false。

數學和聚合 Operator

  1. count: 計算來源 Observable 發出的值的數量。
  2. max: 發出來源 Observable 發出的最大值。
  3. min: 發出來源 Observable 發出的最小值。
  4. reduce: 將來源 Observable 發出的值規約為單個值,該值在源完成時發出。