icon안동민 개발노트

RxJS와 반응형 프로그래밍


 반응형 프로그래밍은 데이터 스트림과 변화의 전파에 중점을 둔 프로그래밍 패러다임입니다.

 RxJS(Reactive Extensions for JavaScript)는 이러한 반응형 프로그래밍을 JavaScript 환경에서 구현한 라이브러리로, 특히 비동기 프로그래밍에서 강력한 도구입니다.

반응형 프로그래밍의 개념과 이점

 반응형 프로그래밍은 다음과 같은 이점을 제공합니다.

  1. 복잡한 비동기 작업의 간결한 표현
  2. 선언적 프로그래밍 스타일
  3. 데이터 흐름의 조작과 조합이 용이
  4. 에러 처리의 일관성

RxJS의 핵심 개념

  1. Observable : 시간에 따라 발생하는 이벤트나 값의 스트림을 나타냅니다.
  2. Observer : Observable을 구독하고 값을 소비하는 객체입니다.
  3. Subscription : Observable 구독의 결과로, 구독을 취소할 수 있게 해줍니다.

 타입스크립트에서의 기본 사용 예

import { Observable } from 'rxjs';
 
const observable = new Observable<number>(subscriber => {
    subscriber.next(1);
    subscriber.next(2);
    subscriber.next(3);
    setTimeout(() => {
        subscriber.next(4);
        subscriber.complete();
    }, 1000);
});
 
const observer = {
    next: (value: number) => console.log('Received:', value),
    error: (err: any) => console.error('Error:', err),
    complete: () => console.log('Completed')
};
 
const subscription = observable.subscribe(observer);
 
// 나중에 구독 취소
// subscription.unsubscribe();

RxJS의 주요 연산자

  1. 생성 연산자
import { of, from, fromEvent } from 'rxjs';
 
// of: 주어진 인자들을 순차적으로 방출
const ofObservable = of(1, 2, 3);
 
// from: 배열, 프로미스, 이터러블 등을 Observable로 변환
const fromObservable = from([1, 2, 3]);
 
// fromEvent: DOM 이벤트를 Observable로 변환
const clickObservable = fromEvent(document, 'click');
  1. 변환 연산자
import { of } from 'rxjs';
import { map, filter, mergeMap } from 'rxjs/operators';
 
const source = of(1, 2, 3, 4, 5);
 
source.pipe(
    map(x => x * 2),
    filter(x => x > 5),
    mergeMap(x => of(x, x * 2))
).subscribe(console.log);
// 출력: 6, 12, 8, 16, 10, 20

타입스크립트에서의 타입 추론과 안정성

 RxJS는 타입스크립트와 잘 통합되어 있어, 대부분의 경우 자동으로 타입이 추론됩니다.

 복잡한 연산자 체인에서도 타입 안정성을 유지할 수 있습니다.

import { Observable, of } from 'rxjs';
import { map } from 'rxjs/operators';
 
interface User {
    id: number;
    name: string;
}
 
const users: Observable<User[]> = of([
    { id: 1, name: 'Alice' },
    { id: 2, name: 'Bob' }
]);
 
const names: Observable<string[]> = users.pipe(
    map(users => users.map(user => user.name))
);
 
names.subscribe(console.log);

에러 처리와 구독 해제

 에러 처리

import { Observable, throwError } from 'rxjs';
import { catchError, retry } from 'rxjs/operators';
 
const fallibleOperation = new Observable<number>(subscriber => {
    if (Math.random() > 0.5) {
        subscriber.error(new Error('Something went wrong'));
    } else {
        subscriber.next(42);
        subscriber.complete();
    }
});
 
fallibleOperation.pipe(
    retry(3),
    catchError(error => {
        console.error('Error caught:', error);
        return of(-1);  // 에러 발생 시 기본값 반환
    })
).subscribe({
    next: value => console.log('Received:', value),
    complete: () => console.log('Completed')
});

 구독 해제와 메모리 누수 방지

import { interval } from 'rxjs';
import { takeUntil } from 'rxjs/operators';
import { Subject } from 'rxjs';
 
const unsubscribe$ = new Subject<void>();
 
interval(1000).pipe(
    takeUntil(unsubscribe$)
).subscribe(console.log);
 
// 5초 후 구독 해제
setTimeout(() => {
    unsubscribe$.next();
    unsubscribe$.complete();
}, 5000);

복잡한 비동기 흐름 구현

 동시성 제어, 재시도 로직, 캐싱 예시

import { Observable, of, throwError } from 'rxjs';
import { mergeMap, retry, shareReplay, take, delay } from 'rxjs/operators';
 
const fetchData = (id: number): Observable<string> => {
    return Math.random() > 0.5 
        ? of(`Data for ${id}`).pipe(delay(1000))
        : throwError(new Error('Fetch failed'));
};
 
const cachedFetchData = (id: number): Observable<string> => {
    return fetchData(id).pipe(
        retry(3),
        shareReplay(1)
    );
};
 
of(1, 2, 3, 4, 5).pipe(
    mergeMap(id => cachedFetchData(id), 2),  // 최대 2개의 동시 요청
    take(3)  // 처음 3개의 성공적인 결과만 가져옴
).subscribe(console.log);

상태 관리와 이벤트 기반 아키텍처

 RxJS를 사용한 간단한 상태 관리 패턴

import { BehaviorSubject } from 'rxjs';
 
interface AppState {
    user: string | null;
    isLoading: boolean;
}
 
const initialState: AppState = {
    user: null,
    isLoading: false
};
 
const state$ = new BehaviorSubject<AppState>(initialState);
 
// 상태 업데이트 함수
function updateState(newState: Partial<AppState>) {
    state$.next({ ...state$.value, ...newState });
}
 
// 상태 구독
state$.subscribe(state => console.log('Current state:', state));
 
// 상태 업데이트
updateState({ isLoading: true });
updateState({ user: 'Alice' });
updateState({ isLoading: false });

RxJS 도입 전략과 주의사항

  1. 점진적 도입 : 기존 코드베이스에 점진적으로 RxJS를 도입합니다.
  2. 학습 곡선 고려 : 팀원들의 RxJS 이해도를 고려하여 도입 속도를 조절합니다.
  3. 과도한 사용 주의 : 모든 비동기 작업에 RxJS를 사용하는 것은 오히려 복잡성을 증가시킬 수 있습니다.
  4. 성능 고려 : 불필요한 구독이나 복잡한 연산자 체인은 성능에 영향을 줄 수 있습니다.
  5. 타입 안정성 유지 : 제네릭을 적극 활용하여 타입 안정성을 확보합니다.
  6. 테스트 용이성 : RxJS 코드의 테스트 전략을 미리 수립합니다.
  7. 구독 관리 : 모든 구독에 대해 적절한 해제 메커니즘을 구현합니다.
  8. 문서화 : 복잡한 Observable 흐름에 대해서는 주석이나 다이어그램으로 문서화합니다.
  9. 최신 버전 유지 : RxJS의 최신 버전을 따라가며, 주요 변경사항을 숙지합니다.
  10. 커뮤니티 리소스 활용 : RxJS 커뮤니티의 풍부한 예제와 패턴을 학습하고 적용합니다.