안동민 개발노트 아이콘

안동민 개발노트

11장 : 비동기 프로그래밍

RxJS와 반응형 프로그래밍

이전 절에서는 Promise와 async/await으로 비동기 작업을 처리하는 방법을 배웠습니다. 이 방식은 단일 작업의 성공/실패를 다루는 데 매우 효과적입니다.

하지만 연속 이벤트 스트림(사용자 입력, 네트워크 이벤트, 타이머 등)을 다루는 복잡한 시나리오에서는 Promise만으로 부족할 수 있습니다.

이 문제를 해결하는 강력한 접근이 반응형 프로그래밍(Reactive Programming)과 라이브러리 RxJS (Reactive Extensions for JavaScript)입니다.

RxJS는 이벤트와 비동기 데이터 흐름을 배열을 다루듯 함수형 방식으로 처리할 수 있게 해줍니다.


반응형 프로그래밍이란?

정의: 반응형 프로그래밍은 데이터 스트림과 변경 사항의 전파에 중점을 둔 비동기 프로그래밍 패러다임입니다.

설명: 기존의 명령형(Imperative) 프로그래밍은 내가 언제 무엇을 할지를 명시적으로 지시하는 방식입니다. 반면 반응형 프로그래밍은 무언가가 발생하면 이렇게 반응하라는 방식으로, 이벤트가 발생했을 때 반응하는 것에 초점을 맞춥니다.

데이터 스트림은 시간이 지남에 따라 발생하는 일련의 이벤트나 값들을 의미합니다. 예를 들어:

  • 사용자 클릭 이벤트
  • 키보드 입력 이벤트
  • HTTP 요청 응답
  • 데이터베이스 변경 알림
  • 타이머 틱

반응형 프로그래밍은 이러한 스트림을 옵저버블(Observable)이라는 개념으로 추상화하고, 다양한 연산자(Operators)를 사용하여 스트림을 변환, 조합, 필터링하는 강력한 도구를 제공합니다.


RxJS의 핵심 개념

RxJS는 반응형 프로그래밍을 자바스크립트/타입스크립트에서 구현하기 위한 라이브러리입니다. 다음 세 가지 핵심 구성 요소를 이해하는 것이 중요합니다.

Observable

정의: Observable은 시간이 지남에 따라 값을 방출(emit)할 수 있는 데이터 스트림을 나타냅니다. Promise가 단일 값을 비동기적으로 제공하는 반면, Observable은 0개, 1개 또는 여러 개의 값을 비동기적으로 제공할 수 있습니다.

설명: Observable은 관찰 가능한 데이터 소스입니다. 즉, 누군가 구독(subscribe)하여 그 값을 관찰할 수 있습니다. Observable은 세 가지 유형의 알림을 방출할 수 있습니다.

  • next: 스트림에서 새로운 값이 방출될 때 호출됩니다.
  • error: 스트림에서 오류가 발생하여 중단될 때 호출됩니다.
  • complete: 스트림이 모든 값을 방출하고 성공적으로 완료될 때 호출됩니다.

Observable은 게으르다(Lazy)는 특징을 가집니다. 즉, 아무도 구독하지 않으면 아무런 작업을 수행하지 않습니다. 구독이 시작되어야 비로소 값이 방출되기 시작합니다.

예시
import { Observable } from 'rxjs';

// Observable 생성
const myObservable = new Observable<number>(subscriber => {
  // next 값을 방출
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);

  // 1초 후 추가 값 방출
  setTimeout(() => {
    subscriber.next(4);
    subscriber.complete(); // 완료 알림
  }, 1000);

  // 구독 해지 시 실행될 클린업 함수 반환
  return () => {
    console.log('Observable: Unsubscribed!');
  };
});

// Observable 구독
console.log('Before subscribe');
const subscription = myObservable.subscribe({
  next: (value) => console.log(`Observable: Got value ${value}`),
  error: (err) => console.error(`Observable: Got error ${err}`),
  complete: () => console.log('Observable: Completed!'),
});
console.log('After subscribe');

// 2초 후 구독 해지 (선택 사항, complete가 호출되면 자동 해지됨)
setTimeout(() => {
  subscription.unsubscribe();
}, 2000);

Observer

정의: Observer는 Observable이 방출하는 값에 반응하는 객체입니다. next, error, complete 메서드를 포함하는 객체입니다.

설명: subscribe() 메서드를 호출할 때 Observer 객체를 인자로 전달합니다. Observer는 Observable의 라이프사이클 이벤트를 처리하는 콜백 함수들의 묶음입니다.

예시 (위와 동일한 subscribe 부분)
const subscription = myObservable.subscribe({
  next: (value) => console.log(`Observer: Got value ${value}`),
  error: (err) => console.error(`Observer: Got error ${err}`),
  complete: () => console.log('Observer: Completed!'),
});

Operators

정의: Operator는 Observable을 입력으로 받아 새로운 Observable을 출력으로 반환하는 함수입니다. 스트림을 변환, 필터링, 결합하는 데 사용됩니다.

설명: Operators는 RxJS의 가장 강력한 부분 중 하나입니다. 이들은 원본 Observable을 변경하지 않고, 새로운 Observable을 생성하여 스트림 처리의 함수형 및 불변성을 유지합니다. Operators는 크게 두 가지 범주로 나눌 수 있습니다.

  • 생성 연산자 (Creation Operators): 새로운 Observable을 생성합니다. (of, from, interval, timer, fromEvent 등)
  • 파이프 가능한 연산자 (Pipeable Operators): Observable 스트림을 변환하고 조합합니다. pipe() 메서드 내에서 체인 형태로 사용됩니다. (map, filter, debounceTime, switchMap, mergeMap, takeUntil 등)
예시
import { of, fromEvent, interval } from 'rxjs';
import { map, filter, debounceTime, take } from 'rxjs/operators'; // pipeable operators는 'rxjs/operators'에서 임포트

// 1. 생성 연산자: of
of(10, 20, 30)
  .subscribe(value => console.log(`of operator: ${value}`)); // 10, 20, 30

// 2. 생성 연산자: fromEvent (DOM 이벤트 스트림)
const clicks = fromEvent(document, 'click');
clicks
  .pipe(
    map((event: MouseEvent) => `Click at ${event.clientX}, ${event.clientY}`), // 이벤트를 문자열로 변환
    debounceTime(300) // 300ms 안에 다른 클릭이 없으면 값을 방출 (과도한 이벤트 방지)
  )
  .subscribe(message => console.log(`fromEvent + pipe: ${message}`));

// 3. 생성 연산자: interval + 파이프 가능한 연산자: take, filter
interval(1000) // 1초마다 0부터 증가하는 숫자 방출
  .pipe(
    take(5),    // 처음 5개의 값만 취함
    filter(num => num % 2 === 0), // 짝수만 필터링
    map(num => `Even number: ${num}`) // 문자열로 변환
  )
  .subscribe({
    next: (val) => console.log(`Interval Stream: ${val}`),
    complete: () => console.log('Interval Stream: Completed!'),
  });

pipe() 메서드 내에서 연산자들을 마치 조립 라인처럼 연결하여 데이터 스트림을 가공할 수 있습니다.


RxJS와 타입스크립트

RxJS는 타입스크립트로 작성되었으며, 타입스크립트와 함께 사용할 때 그 진가가 발휘됩니다.

  • 강력한 타입 추론: Observable의 제네릭 타입(Observable<T>), Observer의 콜백 함수 인자, 그리고 Operator 체인 전반에 걸쳐 타입이 정확하게 추론됩니다. 이는 개발자가 런타임 오류 대신 컴파일 타임에 타입 관련 문제를 발견하고 해결할 수 있게 합니다.
  • IDE 지원: 타입 정보 덕분에 IDE는 자동 완성, 리팩터링, 오류 검출 등 강력한 개발자 경험을 제공합니다.
  • 명확한 인터페이스: Observable, Observer, Subscription 등의 인터페이스가 명확하게 정의되어 있어 코드의 의도를 쉽게 파악할 수 있습니다.
예시
import { of, Observable } from 'rxjs';
import { map, filter } from 'rxjs/operators';

interface Product {
  id: number;
  name: string;
  price: number;
  available: boolean;
}

const products$: Observable<Product[]> = of([
  { id: 1, name: 'Laptop', price: 1200, available: true },
  { id: 2, name: 'Mouse', price: 25, available: false },
  { id: 3, name: 'Keyboard', price: 75, available: true },
]);

products$.pipe(
  // map을 통해 각 제품 배열을 순회하며 가용성 확인
  map(products => products.filter(p => p.available)),
  // map을 통해 필터링된 제품들의 이름만 추출
  map(availableProducts => availableProducts.map(p => p.name.toUpperCase()))
).subscribe(
  (productNames: string[]) => { // productNames가 string[] 타입임을 정확히 추론
    console.log('Available Products (Uppercase):', productNames.join(', '));
  },
  (error) => console.error('Error:', error)
);

// 컴파일러는 pipe 체인의 각 단계에서 타입이 어떻게 변형되는지 추론할 수 있습니다.
// 예를 들어, 첫 번째 map 후에는 Observable<Product[]> -> Observable<Product[]>,
// 두 번째 map 후에는 Observable<string[]> 으로 변환됨을 압니다.

RxJS의 활용 사례

  • UI 이벤트 처리: 사용자 입력(클릭, 키 입력, 마우스 이동)을 스트림으로 처리하여 디바운싱, 스로틀링, 드래그 앤 드롭 구현 등에 활용됩니다.
  • HTTP 요청: Angular와 같은 프레임워크에서는 HTTP 클라이언트가 RxJS Observable을 반환하여 비동기 데이터 흐름을 처리합니다.
  • 실시간 데이터: 웹소켓을 통한 실시간 채팅, 주식 시세 등 스트리밍 데이터를 다룰 때 유용합니다.
  • 복잡한 비동기 흐름 제어: 여러 비동기 작업을 결합(combineLatest, forkJoin, merge), 경쟁(race), 순차 실행(concatMap, exhaustMap, switchMap)하는 데 사용됩니다.
  • 상태 관리: Redux-Observable이나 Ngrx/Effects와 같은 라이브러리에서 비동기 사이드 이펙트(side effect)를 관리하는 데 사용됩니다.

Promise vs Observable

특징PromiseObservable
방출 값단일 값0개, 1개 또는 여러 개의 값
성질즉시 실행(Eager), 한 번 완료게으른(Lazy), 구독해야 실행
취소직접적인 취소 불가능 (작업이 시작되면 끝까지)unsubscribe()를 통해 취소 가능
오류 처리.catch()로 한 번 처리스트림 내에서 여러 번 처리 가능 (재시도 등)
변환/조합제한적 (.then() 체이닝)풍부한 연산자 제공 (pipe() 체이닝)

RxJS를 무조건 쓰는 것이 답은 아닙니다. 이벤트가 얼마나 자주 오고, 취소가 필요한지, 여러 스트림을 합쳐야 하는지에 따라 Promise, async iterator, Observable 중 더 간단한 도구를 고르는 것이 좋습니다.


RxJS와 반응형 프로그래밍은 복잡한 비동기 데이터 스트림과 이벤트 핸들링을 더 선언적이고 효율적으로 처리하게 해주는 패러다임입니다.

처음에는 학습 곡선이 있을 수 있지만, 익숙해지면 코드의 가독성과 유지보수성이 크게 향상되는 것을 체감할 수 있습니다.

타입스크립트는 RxJS 기능을 타입 안전하게 활용할 수 있도록 지원하며, 복잡한 비동기 시스템을 견고하게 구축하는 데 중요한 기반이 됩니다.

RxJS 코드는 아래처럼 소스, 연산자, 구독, 수명주기 신호로 나누면 흐름이 단순해집니다.

아래 다이어그램은 Observable 파이프라인에서 값 타입이 소스, 연산자, 구독, 해제 경계를 통과하는 흐름을 정리합니다.

아래 다이어그램은 RxJS와 반응형 프로그래밍에서 반응형 프로그래밍의 개념과 RxJS의 핵심 개념이 책임 경계와 실행 흐름을 어떻게 바꾸는지 정리합니다.

아래 다이어그램은 RxJS와 반응형 프로그래밍에서 책임 경계, 실행 흐름, 테스트 신호를 확인합니다.

아래 다이어그램은 RxJS와 반응형 프로그래밍을 마무리하며 입력 조건, 실행 경로, 실패 처리 질문을 정리합니다.

아래 다이어그램은 RxJS와 반응형 프로그래밍을 소스, 연산자, 구독, 해제 기준으로 나누어 실제 적용 전에 확인할 질문을 좁힙니다.