Bài này cung cấp nhiều công thức (recipes) thực chiến về cách sử dụng kết hợp Angular và RxJS để xử lý dữ liệu theo mô hình reactive. Các kỹ thuật bao gồm gọi HTTP tuần tự/ song song, xử lý nhiều stream cùng lúc, tránh memory leak, dùng async pipe, transform dữ liệu với map
, tối ưu autocomplete với debounceTime
và switchMap
, viết operator RxJS tùy chỉnh, và retry HTTP lỗi với exponential backoff
.
🔗 1. Gọi HTTP tuần tự và song song với RxJS
- Bài toán: Lấy thông tin người + danh sách phim từ Star Wars API và hiển thị theo thứ tự.
- Vấn đề: Loader tắt khi chỉ mới lấy được 1 phim, và thứ tự phim mỗi lần reload bị đảo.
Giải pháp:
- Dùng
mergeMap
để gọi lần lượt: lấy người → lấy danh sách phim. - Dùng
forkJoin
để gọi song song các phim và đợi tất cả xong mới update UI. - Kết quả: Loader chỉ tắt khi xong hết, và thứ tự phim giữ nguyên.
Code chính:
fetchPerson('1') .pipe( mergeMap(person => { this.person = {...person, filmObjects: []}; return forkJoin(person.films.map(url => this.fetchFilm(url))) }), catchError(err => of([])) ) .subscribe(films => { this.person.filmObjects = films; this.loading = false; });
✅ Ghi nhớ: forkJoin
chỉ emit khi tất cả observable hoàn thành, giúp đảm bảo thứ tự và đồng bộ dữ liệu.
🔄 2. Lắng nghe nhiều stream với combineLatest

- Tình huống: Form chỉnh style hộp (kích thước, màu…) bằng reactive form, hiện tại phải nhấn nút mới áp dụng.
- Mục tiêu: Tự động thay đổi box khi giá trị input thay đổi.
Cách làm:
- Tạo
boxStyles$
là observable dùngcombineLatest
từ các control. - Dùng
startWith
để mỗi control có giá trị khởi đầu. - Dùng
map
để chuyển đổi[size, color,...]
thành object style. - Áp dụng
async pipe
trong HTML.
import { combineLatest, map, Observable, startWith } from 'rxjs'; export class HomeComponent implements OnInit { ... boxStyles$!: Observable<BoxStyles>; ... listenToInputChanges() { const controls: AbstractControl[] = [...]; this.boxStyles$ = combineLatest( controls.map((control) => control.valueChanges.pipe(startWith(control.value)) ); ).pipe( map(([size, borderRadius, textColor, backgroundColor]) => { return { width: `${size}px`, height: `${size}px`, backgroundColor: backgroundColor, color: textColor, borderRadius: `${borderRadius}px`, }; }) ); } }
... <div class="row" *ngIf="boxStyles$ | async as boxStyles"> <div class="box" [ngStyle]="boxStyles"> <div class="box__text"> Hello World! </div> </div> </div> ...
Ghi nhớ:
combineLatest
gộp nhiều stream lại, chỉ emit khi tất cả có giá trị.startWith
cần thiết vìvalueChanges
không emit lần đầu.
❌ 3. Tránh memory leak khi dùng Observable

- Bối cảnh: Stream tiếp tục emit ngay cả khi rời component.
- Giải pháp: Dùng lifecycle hook
ngOnDestroy
để gọiunsubscribe
.
Nâng cao:
- Có nhiều stream → không thể lưu từng
subscription
. - Dùng
takeWhile(() => isActive)
trong pipe để tự động unsubscribe khi biến false.
Mẫu:
import { Component, OnDestroy } from '@angular/core'; ... export class HomeComponent implements OnDestroy { isStreamActive = true; ... startStream() { isStreamActive = true; ... streamSource .pipe(takeWhile(() => this.isStreamActive)) .subscribe(input => {...}); secondStreamSource .pipe(takeWhile(() => this.isStreamActive)) .subscribe(input => {...}); fastestStreamSource .pipe(takeWhile(() => this.isStreamActive)) .subscribe(input => {...}); } stopStream() { this.isStreamActive = false; } ngOnDestroy() { this.stopStream(); } }
<div class="home"> <div class="buttons-container"> <button [disabled]="isStreamActive" class="btn btn- primary" (click)="startStream()">Start Stream</button> <button [disabled]="!isStreamActive" class="btn btn-dark" (click)="stopStream()">Stop Stream</button> </div> ... </div>
✨ 4. Dùng async pipe
để tự unsubscribe

- Ưu điểm: Không cần gọi
unsubscribe
thủ công. - Dùng
async pipe
trong HTML: tự subscribe + unsubscribe khi component destroy. - Dùng
scan
để cộng dồn các giá trị emit từ nhiều stream.
... import { interval, Observable, takeWhile } from 'rxjs'; ... export class HomeComponent implements OnDestroy { ... isStreamActive!: boolean; streamsOutput$!: Observable<number>; constructor() { } ... startStream() { ... const fastestStreamSource = interval(500); this.streamsOutput$ = merge( streamSource, secondStreamSource, fastestStreamSource ).pipe( takeWhile(() => this.isStreamActive), scan((acc, next) => { return [...acc, next]; }, [] as number[]) ); } ... }
<div class="output-stream"> <div class="input-stream__item" *ngFor="let item of streamsOutput$ | async"> {{item}} </div> </div>
Lưu ý:
async pipe
không hỗ trợ hủy stream giữa chừng (chỉ khi component bị hủy).- Khi cần hủy giữa chừng: nên dùng
takeWhile
,takeUntil
, hoặc tự unsubscribe.
🔍 5. Tối ưu autocomplete với switchMap
+ debounceTime

- Vấn đề: Gõ nhanh → nhiều HTTP request chồng chéo.
- Giải pháp:
debounceTime(500)
để chờ user dừng gõ 0.5s.switchMap
để cancel call trước, giữ lại call cuối.
... import { mergeMap, startWith, takeWhile, switchMap } from 'rxjs/operators'; ... ngOnInit() { ... this.searchForm.controls['username'].valueChanges .pipe( startWith(''), debounceTime(500), takeWhile(() => this.componentAlive), switchMap((query) => this.userService.searchUsers(query)) ) .subscribe((users) => {...}); }
Before vs after thêm: debounceTime(500)


Ghi nhớ: switchMap
chỉ giữ lại lần gọi cuối cùng. debounceTime
giúp giảm số lần gọi.
🔮 6. Tạo operator RxJS tùy chỉnh
- Viết operator
logWithLabel(label: string)
dùngtap()
để log dữ liệu với nhãn. - Sử dụng giống như các operator khác:
import { Observable } from 'rxjs/internal/Observable'; import { tap } from 'rxjs/operators'; const logWithLabel = <T>( label: string ): ((source$: Observable<T>) => Observable<T>) => { return (source$) => source$.pipe(tap((value) => console.log(label, value))); }; export default logWithLabel;
Khi dùng:
... import logWithLabel from '../log-with-label'; @Component({...}) export class HomeComponent { ... startStream() { ... this.streamsOutput$ = merge(...).pipe( takeWhile(...), scan(...), logWithLabel('stream-output') ); } ... }
Ghi nhớ: Custom operator là một hàm nhận observable và trả lại observable (dùng tap
, map
, v.v. bên trong).
⏳ 7. Retry HTTP call với exponential backoff

- Kịch bản: Gọi API có thể lỗi, cần retry với khoảng thời gian tăng dần.
Giải pháp:
- Viết custom operator
retryBackoff(maxTry, delay)
sử dụngretry({ delay: fn })
. - Dùng
timer()
+count^2 * delay
để tăng dần thời gian giữa mỗi lần retry.
import { of, pipe, throwError } from 'rxjs'; import { retry } from 'rxjs/operators'; export function retryBackoff(maxTries: number, delay: number) { return pipe( retry({ delay: (error, retryCount) => { return retryCount > maxTries ? throwError(() => error) : of(retryCount); }, }) ); }
Cách gọi bên trong app:
... import { retryBackoff } from './retry-backoff'; ... export class AppComponent implements OnInit { ... ngOnInit(): void { this.isMakingHttpCall = true; this.http .get('http://localhost:3333/api/bad-request') .pipe( retryBackoff(3, 300), catchError(...) ) .subscribe(...); } }
Ứng dụng: Tốt cho hệ thống mạng không ổn định, hoặc muốn retry có chiến lược.
📊 Tổng kết
“Angular và RxJS kết hợp lại tạo ra sức mạnh rất lớn trong việc xử lý dữ liệu bất đồng bộ một cách mượt mà và hiệu quả.”
✅ Kỹ thuật chính:
forkJoin
,mergeMap
,switchMap
,debounceTime
,combineLatest
,takeWhile
,map
,tap
,scan
async pipe
→ gọn code, auto unsubscribe.- Viết operator tùy chỉnh để tái sử dụng logic xử lý stream.
Để lại một bình luận
Bạn phải đăng nhập để gửi bình luận.