icon
11장 : 비동기 프로그래밍

RxJS와 반응형 프로그래밍


이전 절에서 Promise와 async/await을 통해 비동기 작업을 처리하는 방법을 배웠습니다. 이들은 단일 비동기 작업의 결과(성공 또는 실패)를 다루는 데 매우 효과적입니다. 하지만 시간이 지남에 따라 여러 개의 값을 생성하거나, 연속적인 이벤트 스트림(예: 사용자 입력, 네트워크 요청, 타이머 이벤트)을 다루어야 하는 복잡한 비동기 시나리오에서는 Promise만으로는 부족할 수 있습니다.

이러한 문제에 대한 강력한 해결책으로 반응형 프로그래밍(Reactive Programming) 패러다임과 이를 구현한 라이브러리인 RxJS (Reactive Extensions for JavaScript) 가 등장했습니다. RxJS는 이벤트, 비동기 데이터, 또는 여러 값을 다루는 과정을 마치 배열처럼 함수형 프로그래밍 방식으로 처리할 수 있게 해줍니다.


반응형 프로그래밍이란?

정의: 반응형 프로그래밍은 데이터 스트림과 변경 사항의 전파에 중점을 둔 비동기 프로그래밍 패러다임입니다.

설명: 기존의 명령형(Imperative) 프로그래밍은 "내가 언제 무엇을 할지"를 명시적으로 지시하는 방식입니다. 반면 반응형 프로그래밍은 "무언가가 발생하면 이렇게 반응하라"는 방식으로, 이벤트가 발생했을 때 '반응' 하는 것에 초점을 맞춥니다.

데이터 스트림은 시간이 지남에 따라 발생하는 일련의 이벤트나 값들을 의미합니다. 예를 들어:

  • 사용자 클릭 이벤트
  • 키보드 입력 이벤트
  • HTTP 요청 응답
  • 데이터베이스 변경 알림
  • 타이머 틱

반응형 프로그래밍은 이러한 스트림을 옵저버블(Observable) 이라는 개념으로 추상화하고, 다양한 연산자(Operators) 를 사용하여 스트림을 변환, 조합, 필터링하는 강력한 도구를 제공합니다.


RxJS의 핵심 개념

RxJS는 반응형 프로그래밍을 자바스크립트/타입스크립트에서 구현하기 위한 라이브러리입니다. 다음 세 가지 핵심 구성 요소를 이해하는 것이 중요합니다.

Observable

정의: Observable은 시간이 지남에 따라 값을 방출(emit)할 수 있는 데이터 스트림을 나타냅니다. Promise가 단일 값을 비동기적으로 제공하는 반면, Observable은 0개, 1개 또는 여러 개의 값을 비동기적으로 제공할 수 있습니다.

설명: Observable은 "관찰 가능한" 데이터 소스입니다. 즉, 누군가 구독(subscribe)하여 그 값을 관찰할 수 있습니다. Observable은 세 가지 유형의 알림을 방출할 수 있습니다.

  • next: 스트림에서 새로운 값이 방출될 때 호출됩니다.
  • error: 스트림에서 오류가 발생하여 중단될 때 호출됩니다.
  • complete: 스트림이 모든 값을 방출하고 성공적으로 완료될 때 호출됩니다.

Observable은 게으르다(Lazy) 는 특징을 가집니다. 즉, 아무도 구독하지 않으면 아무런 작업을 수행하지 않습니다. 구독이 시작되어야 비로소 값이 방출되기 시작합니다.

예시

import { Observable } from 'rxjs';

// Observable 생성
const myObservable = new Observable<number>(subscriber => {
  // next 값을 방출
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);

  // 1초 후 추가 값 방출
  setTimeout(() => {
    subscriber.next(4);
    subscriber.complete(); // 완료 알림
  }, 1000);

  // 구독 해지 시 실행될 클린업 함수 반환
  return () => {
    console.log('Observable: Unsubscribed!');
  };
});

// Observable 구독
console.log('Before subscribe');
const subscription = myObservable.subscribe({
  next: (value) => console.log(`Observable: Got value ${value}`),
  error: (err) => console.error(`Observable: Got error ${err}`),
  complete: () => console.log('Observable: Completed!'),
});
console.log('After subscribe');

// 2초 후 구독 해지 (선택 사항, complete가 호출되면 자동 해지됨)
setTimeout(() => {
  subscription.unsubscribe();
}, 2000);

Observer

정의: Observer는 Observable이 방출하는 값에 '반응' 하는 객체입니다. next, error, complete 메서드를 포함하는 객체입니다.

설명: subscribe() 메서드를 호출할 때 Observer 객체를 인자로 전달합니다. Observer는 Observable의 라이프사이클 이벤트를 처리하는 콜백 함수들의 묶음입니다.

예시 (위와 동일한 subscribe 부분)

const subscription = myObservable.subscribe({
  next: (value) => console.log(`Observer: Got value ${value}`),
  error: (err) => console.error(`Observer: Got error ${err}`),
  complete: () => console.log('Observer: Completed!'),
});

Operators

정의: Operator는 Observable을 입력으로 받아 새로운 Observable을 출력으로 반환하는 함수입니다. 스트림을 변환, 필터링, 결합하는 데 사용됩니다.

설명: Operators는 RxJS의 가장 강력한 부분 중 하나입니다. 이들은 원본 Observable을 변경하지 않고, 새로운 Observable을 생성하여 스트림 처리의 함수형 및 불변성을 유지합니다. Operators는 크게 두 가지 범주로 나눌 수 있습니다.

  • 생성 연산자 (Creation Operators): 새로운 Observable을 생성합니다. (of, from, interval, timer, fromEvent 등)
  • 파이프 가능한 연산자 (Pipeable Operators): Observable 스트림을 변환하고 조합합니다. pipe() 메서드 내에서 체인 형태로 사용됩니다. (map, filter, debounceTime, switchMap, mergeMap, takeUntil 등)

예시

import { of, fromEvent, interval } from 'rxjs';
import { map, filter, debounceTime, take } from 'rxjs/operators'; // pipeable operators는 'rxjs/operators'에서 임포트

// 1. 생성 연산자: of
of(10, 20, 30)
  .subscribe(value => console.log(`of operator: ${value}`)); // 10, 20, 30

// 2. 생성 연산자: fromEvent (DOM 이벤트 스트림)
const clicks = fromEvent(document, 'click');
clicks
  .pipe(
    map((event: MouseEvent) => `Click at ${event.clientX}, ${event.clientY}`), // 이벤트를 문자열로 변환
    debounceTime(300) // 300ms 안에 다른 클릭이 없으면 값을 방출 (과도한 이벤트 방지)
  )
  .subscribe(message => console.log(`fromEvent + pipe: ${message}`));

// 3. 생성 연산자: interval + 파이프 가능한 연산자: take, filter
interval(1000) // 1초마다 0부터 증가하는 숫자 방출
  .pipe(
    take(5),    // 처음 5개의 값만 취함
    filter(num => num % 2 === 0), // 짝수만 필터링
    map(num => `Even number: ${num}`) // 문자열로 변환
  )
  .subscribe({
    next: (val) => console.log(`Interval Stream: ${val}`),
    complete: () => console.log('Interval Stream: Completed!'),
  });

pipe() 메서드 내에서 연산자들을 마치 조립 라인처럼 연결하여 데이터 스트림을 가공할 수 있습니다.


RxJS와 타입스크립트

RxJS는 타입스크립트로 작성되었으며, 타입스크립트와 함께 사용할 때 그 진가가 발휘됩니다.

  • 강력한 타입 추론: Observable의 제네릭 타입(Observable<T>), Observer의 콜백 함수 인자, 그리고 Operator 체인 전반에 걸쳐 타입이 정확하게 추론됩니다. 이는 개발자가 런타임 오류 대신 컴파일 타임에 타입 관련 문제를 발견하고 해결할 수 있게 합니다.
  • IDE 지원: 타입 정보 덕분에 IDE는 자동 완성, 리팩터링, 오류 검출 등 강력한 개발자 경험을 제공합니다.
  • 명확한 인터페이스: Observable, Observer, Subscription 등의 인터페이스가 명확하게 정의되어 있어 코드의 의도를 쉽게 파악할 수 있습니다.

예시

import { of, Observable } from 'rxjs';
import { map, filter } from 'rxjs/operators';

interface Product {
  id: number;
  name: string;
  price: number;
  available: boolean;
}

const products$: Observable<Product[]> = of([
  { id: 1, name: 'Laptop', price: 1200, available: true },
  { id: 2, name: 'Mouse', price: 25, available: false },
  { id: 3, name: 'Keyboard', price: 75, available: true },
]);

products$.pipe(
  // map을 통해 각 제품 배열을 순회하며 가용성 확인
  map(products => products.filter(p => p.available)),
  // map을 통해 필터링된 제품들의 이름만 추출
  map(availableProducts => availableProducts.map(p => p.name.toUpperCase()))
).subscribe(
  (productNames: string[]) => { // productNames가 string[] 타입임을 정확히 추론
    console.log('Available Products (Uppercase):', productNames.join(', '));
  },
  (error) => console.error('Error:', error)
);

// 컴파일러는 pipe 체인의 각 단계에서 타입이 어떻게 변형되는지 추론할 수 있습니다.
// 예를 들어, 첫 번째 map 후에는 Observable<Product[]> -> Observable<Product[]>,
// 두 번째 map 후에는 Observable<string[]> 으로 변환됨을 압니다.

RxJS의 활용 사례

  • UI 이벤트 처리: 사용자 입력(클릭, 키 입력, 마우스 이동)을 스트림으로 처리하여 디바운싱, 스로틀링, 드래그 앤 드롭 구현 등에 활용됩니다.
  • HTTP 요청: Angular와 같은 프레임워크에서는 HTTP 클라이언트가 RxJS Observable을 반환하여 비동기 데이터 흐름을 처리합니다.
  • 실시간 데이터: 웹소켓을 통한 실시간 채팅, 주식 시세 등 스트리밍 데이터를 다룰 때 유용합니다.
  • 복잡한 비동기 흐름 제어: 여러 비동기 작업을 결합(combineLatest, forkJoin, merge), 경쟁(race), 순차 실행(concatMap, exhaustMap, switchMap)하는 데 사용됩니다.
  • 상태 관리: Redux-Observable이나 Ngrx/Effects와 같은 라이브러리에서 비동기 사이드 이펙트(side effect)를 관리하는 데 사용됩니다.

Promise vs Observable

특징PromiseObservable
방출 값단일 값0개, 1개 또는 여러 개의 값
성질즉시 실행(Eager), 한 번 완료게으른(Lazy), 구독해야 실행
취소직접적인 취소 불가능 (작업이 시작되면 끝까지)unsubscribe()를 통해 취소 가능
오류 처리.catch()로 한 번 처리스트림 내에서 여러 번 처리 가능 (재시도 등)
변환/조합제한적 (.then() 체이닝)풍부한 연산자 제공 (pipe() 체이닝)

RxJS와 반응형 프로그래밍은 복잡한 비동기 데이터 스트림과 이벤트 핸들링을 훨씬 더 선언적이고 효율적으로 처리할 수 있게 해주는 강력한 패러다임입니다. 처음에는 학습 곡선이 있을 수 있지만, 일단 익숙해지면 코드의 가독성과 유지보수성이 크게 향상되는 것을 경험할 수 있습니다. 타입스크립트는 이러한 RxJS의 모든 기능을 타입 안전하게 활용할 수 있도록 완벽하게 지원하며, 복잡한 비동기 시스템을 견고하게 구축하는 데 필수적인 도구가 됩니다.