Angular & RxJS: Sự kết hợp lý tưởng

Bài này cung cấp nhiều công thức (recipes) thực chiến về cách sử dụng kết hợp AngularRxJS để xử lý dữ liệu theo mô hình reactive. Các kỹ thuật bao gồm gọi HTTP tuần tự/ song song, xử lý nhiều stream cùng lúc, tránh memory leak, dùng async pipe, transform dữ liệu với map, tối ưu autocomplete với debounceTimeswitchMap, viết operator RxJS tùy chỉnh, và retry HTTP lỗi với exponential backoff.


🔗 1. Gọi HTTP tuần tự và song song với RxJS

  • Bài toán: Lấy thông tin người + danh sách phim từ Star Wars API và hiển thị theo thứ tự.
  • Vấn đề: Loader tắt khi chỉ mới lấy được 1 phim, và thứ tự phim mỗi lần reload bị đảo.

Giải pháp:

  • Dùng mergeMap để gọi lần lượt: lấy người → lấy danh sách phim.
  • Dùng forkJoin để gọi song song các phim và đợi tất cả xong mới update UI.
  • Kết quả: Loader chỉ tắt khi xong hết, và thứ tự phim giữ nguyên.

Code chính:

fetchPerson('1')
  .pipe(
    mergeMap(person => {
      this.person = {...person, filmObjects: []};
      return forkJoin(person.films.map(url => this.fetchFilm(url)))
    }),
    catchError(err => of([]))
  )
  .subscribe(films => {
    this.person.filmObjects = films;
    this.loading = false;
  });

✅ Ghi nhớ: forkJoin chỉ emit khi tất cả observable hoàn thành, giúp đảm bảo thứ tự và đồng bộ dữ liệu.


🔄 2. Lắng nghe nhiều stream với combineLatest

  • Tình huống: Form chỉnh style hộp (kích thước, màu…) bằng reactive form, hiện tại phải nhấn nút mới áp dụng.
  • Mục tiêu: Tự động thay đổi box khi giá trị input thay đổi.

Cách làm:

  • Tạo boxStyles$ là observable dùng combineLatest từ các control.
  • Dùng startWith để mỗi control có giá trị khởi đầu.
  • Dùng map để chuyển đổi [size, color,...] thành object style.
  • Áp dụng async pipe trong HTML.
import { combineLatest, map, Observable, startWith } from 'rxjs';
export class HomeComponent implements OnInit {
  ...
  boxStyles$!: Observable<BoxStyles>;
  ...
  listenToInputChanges() {
    const controls: AbstractControl[] = [...];
    this.boxStyles$ = combineLatest(
      controls.map((control) =>
        control.valueChanges.pipe(startWith(control.value))
      );
    ).pipe(
      map(([size, borderRadius, textColor, backgroundColor]) => {
        return { width: `${size}px`, height: `${size}px`,
          backgroundColor: backgroundColor,
          color: textColor,
          borderRadius: `${borderRadius}px`,
        };
      })
    );
  }
}
  ...
  <div class="row" *ngIf="boxStyles$ | async as boxStyles">
    <div class="box" [ngStyle]="boxStyles">
      <div class="box__text">
        Hello World!
      </div>
    </div>
  </div>
  ...

Ghi nhớ:

  • combineLatest gộp nhiều stream lại, chỉ emit khi tất cả có giá trị.
  • startWith cần thiết vì valueChanges không emit lần đầu.

❌ 3. Tránh memory leak khi dùng Observable

  • Bối cảnh: Stream tiếp tục emit ngay cả khi rời component.
  • Giải pháp: Dùng lifecycle hook ngOnDestroy để gọi unsubscribe.

Nâng cao:

  • Có nhiều stream → không thể lưu từng subscription.
  • Dùng takeWhile(() => isActive) trong pipe để tự động unsubscribe khi biến false.

Mẫu:

import { Component, OnDestroy } from '@angular/core';
  ...
export class HomeComponent implements OnDestroy {
  isStreamActive = true;
  ...
  startStream() {
    isStreamActive = true;
    ...
    streamSource
      .pipe(takeWhile(() => this.isStreamActive))
      .subscribe(input => {...});
    secondStreamSource
      .pipe(takeWhile(() => this.isStreamActive))
      .subscribe(input => {...});
    fastestStreamSource
      .pipe(takeWhile(() => this.isStreamActive))
      .subscribe(input => {...});
  }

  stopStream() {
    this.isStreamActive = false;
  }

  ngOnDestroy() {
    this.stopStream();
  }
}
  <div class="home">
      <div class="buttons-container">
        <button [disabled]="isStreamActive" class="btn btn-
          primary" (click)="startStream()">Start
          Stream</button>
        <button [disabled]="!isStreamActive" class="btn
          btn-dark" (click)="stopStream()">Stop
          Stream</button>
      </div>
      ...
  </div>

✨ 4. Dùng async pipe để tự unsubscribe

  • Ưu điểm: Không cần gọi unsubscribe thủ công.
  • Dùng async pipe trong HTML: tự subscribe + unsubscribe khi component destroy.
  • Dùng scan để cộng dồn các giá trị emit từ nhiều stream.
...
import { interval, Observable, takeWhile } from 'rxjs';
...
export class HomeComponent implements OnDestroy {
  ...
  isStreamActive!: boolean;
  streamsOutput$!: Observable<number>;
  constructor() { }
  ...
  startStream() {
    ...
    const fastestStreamSource = interval(500);
    this.streamsOutput$ = merge(
      streamSource,
      secondStreamSource,
      fastestStreamSource
    ).pipe(
      takeWhile(() => this.isStreamActive),
      scan((acc, next) => {
        return [...acc, next];
      }, [] as number[])
    );
  }
  ...
}
    <div class="output-stream">
      <div class="input-stream__item" *ngFor="let item of streamsOutput$ | async">
        {{item}}
      </div>
    </div>

Lưu ý:

  • async pipe không hỗ trợ hủy stream giữa chừng (chỉ khi component bị hủy).
  • Khi cần hủy giữa chừng: nên dùng takeWhile, takeUntil, hoặc tự unsubscribe.

🔍 5. Tối ưu autocomplete với switchMap + debounceTime

  • Vấn đề: Gõ nhanh → nhiều HTTP request chồng chéo.
  • Giải pháp:
    • debounceTime(500) để chờ user dừng gõ 0.5s.
    • switchMap để cancel call trước, giữ lại call cuối.
...
import { mergeMap, startWith, takeWhile, switchMap } from 'rxjs/operators';
...
ngOnInit() {
    ...
    this.searchForm.controls['username'].valueChanges
      .pipe(
        startWith(''),
        debounceTime(500),
        takeWhile(() => this.componentAlive),
        switchMap((query) =>
          this.userService.searchUsers(query))
      )
      .subscribe((users) => {...});
  }

Before vs after thêm: debounceTime(500)

Ghi nhớ: switchMap chỉ giữ lại lần gọi cuối cùng. debounceTime giúp giảm số lần gọi.


🔮 6. Tạo operator RxJS tùy chỉnh

  • Viết operator logWithLabel(label: string) dùng tap() để log dữ liệu với nhãn.
  • Sử dụng giống như các operator khác:
import { Observable } from 'rxjs/internal/Observable';
import { tap } from 'rxjs/operators';
const logWithLabel = <T>(
  label: string
): ((source$: Observable<T>) => Observable<T>) => {
  return (source$) => source$.pipe(tap((value) =>
    console.log(label, value)));
};
export default logWithLabel;

Khi dùng:

...
import logWithLabel from '../log-with-label';
@Component({...})
export class HomeComponent {
  ...
  startStream() {
    ...
    this.streamsOutput$ = merge(...).pipe(
      takeWhile(...),
      scan(...),
      logWithLabel('stream-output')
    );
  }
  ...
}

Ghi nhớ: Custom operator là một hàm nhận observable và trả lại observable (dùng tap, map, v.v. bên trong).


⏳ 7. Retry HTTP call với exponential backoff

  • Kịch bản: Gọi API có thể lỗi, cần retry với khoảng thời gian tăng dần.

Giải pháp:

  • Viết custom operator retryBackoff(maxTry, delay) sử dụng retry({ delay: fn }).
  • Dùng timer() + count^2 * delay để tăng dần thời gian giữa mỗi lần retry.
import { of, pipe, throwError } from 'rxjs';
import { retry } from 'rxjs/operators';
export function retryBackoff(maxTries: number, delay: number) {
  return pipe(
    retry({
      delay: (error, retryCount) => {
        return retryCount > maxTries ? throwError(() =>
          error) : of(retryCount);
      },
    })
  );
}

Cách gọi bên trong app:

...
import { retryBackoff } from './retry-backoff';
...
export class AppComponent implements OnInit {
  ...
  ngOnInit(): void {
    this.isMakingHttpCall = true;
    this.http
      .get('http://localhost:3333/api/bad-request')
      .pipe(
        retryBackoff(3, 300),
        catchError(...)
      )
      .subscribe(...);
  }
}

Ứng dụng: Tốt cho hệ thống mạng không ổn định, hoặc muốn retry có chiến lược.


📊 Tổng kết

“Angular và RxJS kết hợp lại tạo ra sức mạnh rất lớn trong việc xử lý dữ liệu bất đồng bộ một cách mượt mà và hiệu quả.”

✅ Kỹ thuật chính:

  • forkJoin, mergeMap, switchMap, debounceTime, combineLatest, takeWhile, map, tap, scan
  • async pipe → gọn code, auto unsubscribe.
  • Viết operator tùy chỉnh để tái sử dụng logic xử lý stream.

🎓 Tham khảo thêm:

Để lại một bình luận