안동민 개발노트 아이콘

안동민 개발노트

12장 : 고급 주제와 최신 트렌드

CQRS 패턴 구현

지난 절에서는 NestJS로 실시간 웹 애플리케이션 핵심 기술인 WebSocket을 구현하는 방법을 알아봤습니다.

이제 12장 세 번째 절에서는 대규모 엔터프라이즈 애플리케이션의 확장성과 성능을 높이는 고급 아키텍처 패턴, CQRS (Command Query Responsibility Segregation)와 이를 NestJS에서 구현하는 방법을 살펴보겠습니다.

기존 애플리케이션은 대부분 CRUD(Create, Read, Update, Delete) 모델로 데이터를 처리합니다.

즉 데이터를 읽는(Query) 작업과 쓰는(Command) 작업이 동일한 데이터 모델과 로직을 공유합니다.

하지만 복잡한 시스템에서는 읽기와 쓰기의 빈도와 요구사항이 크게 달라 성능 병목이나 확장성 문제가 생길 수 있습니다.

CQRS는 이런 문제를 해결하기 위해 등장한 패턴입니다.


CQRS란?

CQRS는 애플리케이션의 작업(Operation)을 명령(Command)조회(Query)로 나누고, 이 두 가지 책임에 대해 서로 다른 모델을 사용하는 아키텍처 패턴입니다.

  • 명령 (Command): 데이터 상태를 변경하는 작업입니다 (생성, 업데이트, 삭제). 명령은 비즈니스 로직을 포함하며, 일반적으로 결과 값을 반환하지 않고 작업의 성공/실패 여부만 알립니다.
  • 조회 (Query): 데이터를 읽는 작업입니다. 조회는 데이터의 상태를 변경하지 않으며, 복잡한 비즈니스 로직 없이 데이터를 가져와 소비자에게 반환합니다.
CQRS의 핵심 아이디어

전통적인 CRUD 모델에서는 동일한 데이터 모델(예: ORM 엔티티)을 읽기(조회)와 쓰기(명령) 모두에 사용합니다. 이로 인해 다음과 같은 문제가 발생할 수 있습니다.

  • 성능 병목: 읽기 작업이 쓰기 작업보다 훨씬 많을 때, 쓰기 작업에 최적화된 데이터 모델이 읽기 성능을 저하시킬 수 있습니다.
  • 복잡성 증가: 읽기와 쓰기 로직이 얽히면서 코드 복잡도가 증가하고 유지보수가 어려워집니다.
  • 확장성 제약: 읽기 스케일링과 쓰기 스케일링 요구사항이 다를 때, 동일한 모델로는 효율적인 확장이 어렵습니다.

CQRS는 이를 분리하여 각 책임에 최적화된 아키텍처를 구축할 수 있도록 합니다.

CQRS의 장점
  • 확장성 (Scalability): 읽기 모델과 쓰기 모델을 독립적으로 확장할 수 있습니다. 예를 들어, 읽기 작업이 많으면 조회 모델의 인스턴스를 늘리고, 쓰기 작업이 많으면 명령 모델의 인스턴스를 늘릴 수 있습니다.
  • 성능 최적화
    • 쓰기 모델: 쓰기 작업에 최적화된 데이터베이스(예: NoSQL)나 데이터 구조를 사용할 수 있습니다.
    • 읽기 모델: 읽기 작업에 최적화된 데이터베이스(예: 검색 엔진, Read Replica)나 캐싱 전략을 사용할 수 있습니다. 복잡한 JOIN 없이 쿼리에 필요한 데이터만 미리 평탄화(denormalization)하여 저장할 수 있습니다.
  • 유연성: 각 모델에 독립적인 기술 스택을 적용할 수 있습니다.
  • 복잡성 분리: 읽기/쓰기 로직을 분리하여 코드의 응집도를 높이고 유지보수를 용이하게 합니다.
  • 이벤트 소싱(Event Sourcing)과의 시너지: CQRS는 종종 이벤트 소싱과 함께 사용됩니다. 모든 상태 변경을 이벤트로 기록하고, 이 이벤트를 통해 읽기 모델을 구축하는 방식입니다.
CQRS의 단점
  • 복잡성 증가: 읽기/쓰기 모델을 분리하므로 시스템 아키텍처가 더 복잡해지고, 초기 설정 및 관리에 더 많은 노력이 필요합니다.
  • 데이터 동기화: 쓰기 모델에서 변경된 데이터가 읽기 모델에 반영되는 과정에서 데이터 일관성(Eventual Consistency) 문제가 발생할 수 있습니다. 즉, 쓰기 후 읽기까지 약간의 지연이 발생할 수 있습니다.
  • 학습 곡선: 새로운 아키텍처 패턴에 대한 이해와 구현 경험이 필요합니다.

NestJS에서 CQRS 구현

NestJS는 자체적으로 CQRS를 직접 지원하는 모듈은 없지만, 모듈 시스템과 커스텀 프로바이더, 메시지 버스 패턴을 활용하여 CQRS를 구현할 수 있습니다. 주로 @nestjs/cqrs 패키지를 사용하여 CQRS의 핵심 구성 요소를 통합합니다.

@nestjs/cqrs 패키지 설치

npm install @nestjs/cqrs @nestjs/event-emitter # event-emitter는 이벤트 처리에 유용

CQRS의 주요 구성 요소

@nestjs/cqrs는 다음과 같은 핵심 빌딩 블록을 제공합니다.

  • Command (명령): 시스템의 상태를 변경하는 작업을 정의하는 클래스입니다.
    • CommandBus: 명령을 발행(dispatch)하고 해당 명령 핸들러로 라우팅합니다.
    • CommandHandler: 특정 명령을 처리하는 비즈니스 로직을 포함하는 클래스입니다.
  • Query (조회): 시스템의 상태를 조회하는 작업을 정의하는 클래스입니다.
    • QueryBus: 조회를 발행하고 해당 조회 핸들러로 라우팅합니다.
    • QueryHandler: 특정 조회를 처리하고 데이터를 반환하는 클래스입니다.
  • Event (이벤트): 시스템에서 중요한 상태 변경이 발생했음을 나타내는 메시지입니다.
    • EventBus: 이벤트를 발행하고 모든 관련 이벤트 핸들러로 브로드캐스트합니다.
    • EventHandler: 특정 이벤트를 구독하고 처리하는 클래스입니다. (예: 읽기 모델 업데이트)
  • Saga: 여러 명령과 이벤트를 조율하여 복잡한 비즈니스 프로세스를 관리하는 클래스입니다.

NestJS에서 CQRS 구현 예시

사용자 생성(명령) 및 사용자 조회(조회) 기능을 CQRS 패턴으로 구현하는 예시입니다.

Commands (명령)
src/user/commands/create-user.command.ts
export class CreateUserCommand {
  constructor(
    public readonly name: string,
    public readonly email: string,
  ) {}
}
src/user/commands/handlers/create-user.handler.ts
import { CommandHandler, ICommandHandler } from '@nestjs/cqrs';
import { CreateUserCommand } from '../impl/create-user.command';
import { Logger } from '@nestjs/common';
import { UserRepository } from '../../user.repository'; // 사용자 데이터 저장소 (쓰기 모델)
import { UserCreatedEvent } from '../../events/impl/user-created.event';
import { EventBus } from '@nestjs/cqrs'; // EventBus 주입

@CommandHandler(CreateUserCommand)
export class CreateUserHandler implements ICommandHandler<CreateUserCommand> {
  private readonly logger = new Logger(CreateUserHandler.name);

  constructor(
    private readonly userRepository: UserRepository,
    private readonly eventBus: EventBus, // EventBus 주입
  ) {}

  async execute(command: CreateUserCommand) {
    this.logger.log(`Handling CreateUserCommand: ${command.email}`);
    // 실제 데이터베이스에 사용자 생성 로직
    const newUser = await this.userRepository.createUser(command.name, command.email);

    // 사용자 생성 후 이벤트 발행
    this.eventBus.publish(new UserCreatedEvent(newUser.id, newUser.email, newUser.name));

    return newUser; // 또는 생성 성공 여부만 반환
  }
}
Queries (조회)
src/user/queries/get-user-by-id.query.ts
export class GetUserByIdQuery {
  constructor(public readonly userId: string) {}
}
src/user/queries/handlers/get-user-by-id.handler.ts
import { IQueryHandler, QueryHandler } from '@nestjs/cqrs';
import { GetUserByIdQuery } from '../impl/get-user-by-id.query';
import { Logger } from '@nestjs/common';
import { UserReadModelRepository } from '../../user-read-model.repository'; // 조회 모델 데이터 저장소

@QueryHandler(GetUserByIdQuery)
export class GetUserByIdHandler implements IQueryHandler<GetUserByIdQuery> {
  private readonly logger = new Logger(GetUserByIdHandler.name);

  constructor(private readonly userReadModelRepository: UserReadModelRepository) {} // 읽기 모델 저장소 주입

  async execute(query: GetUserByIdQuery) {
    this.logger.log(`Handling GetUserByIdQuery: ${query.userId}`);
    // 읽기 전용 데이터베이스에서 사용자 정보 조회 로직
    const user = await this.userReadModelRepository.findById(query.userId);
    return user;
  }
}
Events (이벤트)
src/user/events/impl/user-created.event.ts
export class UserCreatedEvent {
  constructor(
    public readonly userId: string,
    public readonly email: string,
    public readonly name: string,
  ) {}
}
src/user/events/handlers/user-created.handler.ts
import { EventsHandler, IEventHandler } from '@nestjs/cqrs';
import { UserCreatedEvent } from '../impl/user-created.event';
import { Logger } from '@nestjs/common';
import { UserReadModelRepository } from '../../user-read-model.repository'; // 조회 모델 데이터 저장소

@EventsHandler(UserCreatedEvent)
export class UserCreatedHandler implements IEventHandler<UserCreatedEvent> {
  private readonly logger = new Logger(UserCreatedHandler.name);

  constructor(private readonly userReadModelRepository: UserReadModelRepository) {} // 읽기 모델 저장소 주입

  async handle(event: UserCreatedEvent) {
    this.logger.log(`Handling UserCreatedEvent: User ID ${event.userId}`);
    // 사용자가 생성되면, 읽기 모델을 업데이트하는 로직
    // 예: 별도의 읽기 전용 DB(Elasticsearch, Redis 등)에 사용자 정보 저장
    await this.userReadModelRepository.createOrUpdate({
      id: event.userId,
      name: event.name,
      email: event.email,
      // 필요한 추가적인 읽기 전용 필드
    });
  }
}
Repositories (저장소)
  • UserRepository (쓰기 모델): 주 데이터베이스(예: PostgreSQL, MongoDB)에 데이터를 쓰고 수정하는 로직을 담당합니다.
    src/user/user.repository.ts (개념적 코드)
    import { Injectable } from '@nestjs/common';
    
    @Injectable()
    export class UserRepository {
      // 실제로는 TypeORM이나 Mongoose 등을 사용
      private users: any[] = [];
      private nextId = 1;
    
      async createUser(name: string, email: string) {
        const newUser = { id: (this.nextId++).toString(), name, email, createdAt: new Date() };
        this.users.push(newUser);
        return newUser;
      }
      // ... update, delete 등 쓰기 관련 메서드
    }
  • UserReadModelRepository (읽기 모델): 읽기 전용 데이터베이스(예: Redis, Elasticsearch, 또는 주 데이터베이스의 Read Replica)에서 데이터를 조회하고, 이벤트에 의해 업데이트되는 로직을 담당합니다.
    src/user/user-read-model.repository.ts (개념적 코드)
    import { Injectable } from '@nestjs/common';
    
    @Injectable()
    export class UserReadModelRepository {
      // 실제로는 Redis 클라이언트, Elasticsearch 클라이언트 등 사용
      private readUsers: Map<string, any> = new Map();
    
      async findById(id: string) {
        return this.readUsers.get(id);
      }
    
      async createOrUpdate(user: { id: string; name: string; email: string }) {
        this.readUsers.set(user.id, user);
        console.log(`Read model updated for user: ${user.id}`);
      }
      // ... 다른 조회 관련 메서드
    }
NestJS 모듈 설정

CqrsModule을 임포트하고, Command/Query/Event 핸들러를 프로바이더에 등록합니다.

src/user/user.module.ts
import { Module } from '@nestjs/common';
import { CqrsModule } from '@nestjs/cqrs';
import { UserController } from './user.controller';
import { UserRepository } from './user.repository';
import { UserReadModelRepository } from './user-read-model.repository';

// Command Handlers
import { CreateUserHandler } from './commands/handlers/create-user.handler';

// Query Handlers
import { GetUserByIdHandler } from './queries/handlers/get-user-by-id.handler';

// Event Handlers
import { UserCreatedHandler } from './events/handlers/user-created.handler';

// CQRS 관련 프로바이더들을 배열로 정의
export const CommandHandlers = [CreateUserHandler];
export const QueryHandlers = [GetUserByIdHandler];
export const EventHandlers = [UserCreatedHandler];

@Module({
  imports: [CqrsModule], // CqrsModule 임포트
  controllers: [UserController],
  providers: [
    UserRepository, // 쓰기 모델 리포지토리
    UserReadModelRepository, // 읽기 모델 리포지토리
    ...CommandHandlers, // 모든 Command Handler 등록
    ...QueryHandlers,   // 모든 Query Handler 등록
    ...EventHandlers,   // 모든 Event Handler 등록
  ],
})
export class UserModule {}
src/app.module.ts (UserModule 임포트)
import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { UserModule } from './user/user.module';

@Module({
  imports: [UserModule], // UserModule 임포트
  controllers: [AppController],
  providers: [AppService],
})
export class AppModule {}
컨트롤러에서 사용

컨트롤러에서 CommandBusQueryBus를 주입받아 사용합니다.

src/user/user.controller.ts
import { Controller, Post, Get, Body, Param, Logger } from '@nestjs/common';
import { CommandBus, QueryBus } from '@nestjs/cqrs'; // CQRS 버스 주입
import { CreateUserCommand } from './commands/impl/create-user.command';
import { GetUserByIdQuery } from './queries/impl/get-user-by-id.query';

@Controller('users')
export class UserController {
  private readonly logger = new Logger(UserController.name);

  constructor(
    private readonly commandBus: CommandBus,
    private readonly queryBus: QueryBus,
  ) {}

  @Post()
  async createUser(@Body() body: { name: string; email: string }) {
    this.logger.log('Received createUser request');
    // Command 발행
    const user = await this.commandBus.execute(new CreateUserCommand(body.name, body.email));
    return { message: 'User created successfully', user };
  }

  @Get(':id')
  async getUser(@Param('id') id: string) {
    this.logger.log(`Received getUser request for ID: ${id}`);
    // Query 발행
    const user = await this.queryBus.execute(new GetUserByIdQuery(id));
    if (!user) {
      // NestJS 예외 처리 (NotFoundException 등)
      throw new Error('User not found');
    }
    return user;
  }
}

CQRS 구현 시 고려사항

  • 데이터 일관성 (Eventual Consistency): 쓰기 모델과 읽기 모델 간의 데이터 동기화 지연을 허용해야 합니다. 사용자에게 지금은 데이터가 바로 보이지 않을 수 있지만, 잠시 후 반영됩니다와 같은 메시지를 제공해야 할 수도 있습니다.
  • 트랜잭션 관리: 쓰기 모델에서 복잡한 트랜잭션이 필요한 경우, 도메인 주도 설계(DDD)의 애그리게이트(Aggregate) 패턴과 함께 사용하는 것이 효과적입니다.
  • 메시지 큐/브로커: 쓰기 모델에서 읽기 모델로 이벤트(데이터 변경 알림)를 전송할 때, Kafka, RabbitMQ, AWS SQS/SNS 등 신뢰할 수 있는 메시지 큐/브로커를 사용하는 것이 일반적입니다. 이는 비동기 통신을 통해 시스템의 결합도를 낮추고 확장성을 높입니다.
  • 데이터베이스 선택: 읽기/쓰기 모델에 각각 최적화된 데이터베이스를 선택합니다.
    • 쓰기 모델: 관계형 DB(PostgreSQL, MySQL), 문서 DB(MongoDB) 등
    • 읽기 모델: 검색 엔진(Elasticsearch), 인메모리 캐시(Redis), Read Replica, NoSQL 등
  • 로깅 및 모니터링: CQRS는 시스템을 더 분산시키므로, 각 컴포넌트 간의 데이터 흐름과 병목 지점을 추적할 수 있도록 분산 로깅 및 트레이싱 시스템(예: OpenTelemetry, Jaeger)을 구축하는 것이 중요합니다.

CQRS를 도입할 때는 쓰기 성공 이후 조회 반영까지의 지연과 운영 추적을 함께 설계해야 합니다.

CQRS 패턴은 모든 애플리케이션에 필요한 것은 아닙니다.

시스템 복잡성이 낮거나 읽기/쓰기 비율이 크게 다르지 않다면, 전통적인 CRUD 모델이 더 단순하고 관리하기 쉽습니다.

높은 확장성, 성능 최적화, 복잡한 도메인 모델이 필요한 대규모 엔터프라이즈 시스템에서는 CQRS가 구조적 이점을 제공할 수 있습니다.

NestJS의 모듈성과 @nestjs/cqrs 패키지는 이 복잡한 패턴을 구조적으로 구현하는 데 도움을 줍니다.

CQRS 개념과 NestJS에서 CQRS 구현 흐름을 함께 정리한 보조 다이어그램입니다.