Apache Kafka와 이벤트 기반 아키텍처
지난 절에서는 NestJS와 gRPC를 활용하여 고성능 서비스 간 동기 통신을 구현하는 방법을 살펴보았습니다. 이제 7장의 마지막 절로, 마이크로서비스 아키텍처에서 비동기 통신과 이벤트 기반 아키텍처(EDA)의 핵심 요소인 Apache Kafka를 NestJS에 적용하는 방법에 대해 알아보겠습니다.
동기 통신(Synchronous Communication)은 요청-응답 패턴에 적합하지만, 서비스 간 강한 결합을 유발하고 한 서비스의 장애가 다른 서비스로 전파될 위험이 있습니다. 또한 대규모 이벤트를 처리하거나 실시간 데이터 파이프라인을 구축할 때는 비효율적입니다. Apache Kafka는 이러한 문제를 해결하고 확장성, 내결함성, 높은 처리량을 제공하는 분산 스트리밍 플랫폼입니다.
이벤트 기반 아키텍처 (EDA)와 Kafka
이벤트 기반 아키텍처(EDA, Event-Driven Architecture) 는 시스템의 구성 요소들이 직접적으로 호출하는 대신, 이벤트(Event)를 발행하고 구독하는 방식으로 상호작용하는 아키텍처 스타일입니다. 각 서비스는 자신이 관심 있는 이벤트를 구독하고, 해당 이벤트가 발생하면 필요한 작업을 수행합니다.
EDA의 주요 특징
- 느슨한 결합(Loose Coupling): 서비스들이 서로를 직접 호출하지 않으므로, 한 서비스의 변경이 다른 서비스에 미치는 영향이 최소화됩니다.
- 확장성: 이벤트 생산자와 소비자 모두 독립적으로 확장할 수 있습니다.
- 복원력(Resilience): 한 서비스가 다운되더라도 이벤트는 메시지 큐에 저장되어 나중에 처리될 수 있으므로, 시스템 전체의 가용성이 높아집니다.
- 비동기 통신: 응답을 즉시 기다리지 않으므로, 장시간 실행되는 작업을 처리하거나 백그라운드 작업을 실행하는 데 적합합니다.
- 실시간 처리: 데이터 변경 이벤트를 즉시 처리하여 실시간 시스템 구축에 용이합니다.
Apache Kafka는 이러한 EDA를 구현하는 데 가장 널리 사용되는 플랫폼 중 하나입니다. Kafka는 분산된 커밋 로그(Commit Log) 또는 분산 메시지 브로커로 볼 수 있으며, 다음과 같은 특징을 가집니다.
- 높은 처리량: 초당 수백만 건의 이벤트를 처리할 수 있습니다.
- 내결함성: 여러 서버에 데이터를 복제하여 서버 장애 시에도 데이터 손실 없이 안정적인 운영이 가능합니다.
- 확장성: 수평적 확장이 용이하여 데이터 양이 증가해도 유연하게 대응할 수 있습니다.
- 영속성: 발행된 이벤트는 디스크에 저장되어 설정된 기간 동안 유지되므로, 소비자가 나중에 이벤트를 다시 읽을 수 있습니다.
- 다중 소비자: 여러 소비자가 동일한 이벤트를 독립적으로 소비할 수 있습니다.
Kafka의 핵심 개념
- Producer(생산자): Kafka 토픽으로 메시지(이벤트)를 발행하는 애플리케이션.
- Consumer(소비자): Kafka 토픽에서 메시지를 구독하고 처리하는 애플리케이션.
- Topic(토픽): 메시지(이벤트)가 발행되고 소비되는 논리적인 카테고리.
- Partition(파티션): 토픽을 물리적으로 나누는 단위. 파티션 덕분에 Kafka는 높은 처리량을 가집니다. 각 파티션의 메시지는 순서가 보장됩니다.
- Broker(브로커): Kafka 서버. 여러 브로커가 클러스터를 구성하여 내결함성과 확장성을 제공합니다.
- Zookeeper(주키퍼): Kafka 클러스터의 메타데이터(토픽, 파티션 정보 등)를 관리하는 분산 코디네이션 서비스 (최신 Kafka 버전에서는 주키퍼 없이도 동작 가능).
NestJS에서 Kafka 설정 및 구현
NestJS는 @nestjs/microservices
패키지를 통해 Kafka 전송 계층을 지원합니다.
시나리오: Users
서비스에서 새로운 사용자가 생성되면 이벤트를 Kafka에 발행하고, Notifications
서비스에서 이 이벤트를 구독하여 사용자에게 환영 이메일을 보내는 시나리오를 구축해 보겠습니다.
사전 준비: 로컬 환경에서 Kafka와 Zookeeper를 실행해야 합니다. Docker를 사용하는 것이 가장 편리합니다.
# Docker Compose 파일 (docker-compose.yml) 예시
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:7.5.0
hostname: kafka
container_name: kafka
ports:
- "9092:9092"
- "9093:9093" # 내부 통신용 포트
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9093,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
depends_on:
- zookeeper
# 실행: docker-compose up -d
Docker Compose 파일로 Kafka를 실행한 후 다음 단계를 진행합니다.
사용자 서비스 구축
새로운 사용자가 생성될 때 Kafka에 이벤트를 발행하는 서비스입니다.
단계 1: 필요한 패키지 설치
users-service
프로젝트에서 설치합니다.
npm install @nestjs/microservices @nestjs/platform-express
npm install --save-dev @types/express # 웹 서버 역할을 겸할 경우
kafka-node
(레거시): 이전에는kafka-node
를 사용했지만, NestJS 최신 버전에서는@nestjs/microservices
가 내부적으로kafkajs
를 기반으로 하므로 별도로 설치할 필요가 없습니다.
단계 2: main.ts
파일 수정 (HTTP 서버 유지)
사용자 서비스는 클라이언트의 HTTP 요청을 받아 사용자 생성 후 Kafka 이벤트를 발행합니다.
import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
async function bootstrap() {
const app = await NestFactory.create(AppModule);
// NestJS 마이크로서비스 챕터의 컨벤션을 위해 포트 3000 사용
await app.listen(3000);
console.log('Users Service (HTTP Server) is listening on port 3000');
}
bootstrap();
단계 3: UsersModule
에 Kafka 클라이언트 등록
ClientsModule
을 사용하여 Kafka 클라이언트를 등록하고, 이를 UsersController
에 주입하여 이벤트를 발행합니다.
import { Module } from '@nestjs/common';
import { ClientsModule, Transport } from '@nestjs/microservices';
import { UsersController } from './users.controller';
import { UsersService } from './users.service';
@Module({
imports: [
ClientsModule.register([
{
name: 'KAFKA_SERVICE', // Kafka 클라이언트 토큰
transport: Transport.KAFKA, // Kafka 전송 방식 사용
options: {
client: {
clientId: 'user-producer', // 클라이언트 ID
brokers: ['localhost:9092'], // Kafka 브로커 주소
},
producer: {
allowAutoTopicCreation: true, // 토픽이 없으면 자동으로 생성
},
},
},
]),
],
controllers: [UsersController],
providers: [UsersService],
})
export class UsersModule {}
name: 'KAFKA_SERVICE'
: 이 클라이언트 프록시를 의존성 주입을 통해 참조할 토큰입니다.transport: Transport.KAFKA
: Kafka 전송 방식을 사용합니다.options.client
: Kafka 클라이언트 설정 (고유한clientId
, 브로커 목록brokers
).options.producer
: Kafka 생산자 설정 (예:allowAutoTopicCreation
으로 토픽 자동 생성 활성화).
단계 4: 사용자 컨트롤러 및 서비스 구현 (Kafka 이벤트 발행)
사용자 생성 시 Kafka emit
메서드를 통해 이벤트를 발행합니다.
import { Controller, Post, Body, Get, Param, Inject } from '@nestjs/common';
import { ClientKafka } from '@nestjs/microservices'; // ClientKafka 임포트 (ClientProxy의 하위 타입)
import { UsersService } from './users.service';
import { OnModuleInit } from '@nestjs/common'; // OnModuleInit 임포트
import { lastValueFrom } from 'rxjs'; // lastValueFrom 임포트
interface User {
id: number;
name: string;
email: string;
}
@Controller('users') // HTTP 엔드포인트
export class UsersController implements OnModuleInit {
constructor(
private readonly usersService: UsersService,
@Inject('KAFKA_SERVICE') private readonly client: ClientKafka, // Kafka 클라이언트 주입
) {}
async onModuleInit() {
// 이 클라이언트가 메시지를 발행할 모든 토픽을 구독해야 합니다.
// NestJS는 이 토픽을 미리 연결하여 메시지 전송 실패를 방지합니다.
this.client.subscribeToResponseOf('user_created');
await this.client.connect(); // 클라이언트 연결
}
@Post()
async createUser(@Body() userDto: { name: string; email: string }): Promise<User> {
const newUser = this.usersService.create(userDto);
console.log(`Users Service: New user created: ${JSON.stringify(newUser)}`);
// 'user_created' 토픽으로 이벤트 발행
// emit()은 응답을 기다리지 않는 이벤트 기반 통신에 사용합니다.
this.client.emit('user_created', newUser); // 메시지 키는 생략하고 값만 전송
return newUser;
}
// 기존 사용자 조회 엔드포인트 (HTTP)
@Get(':id')
getUserById(@Param('id') id: string): User {
const userId = parseInt(id, 10);
return this.usersService.findOne(userId);
}
}
ClientKafka
: Kafka 전송 방식을 위한 특화된 클라이언트입니다.onModuleInit()
:client.connect()
를 호출하여 Kafka 브로커와 연결을 설정합니다.subscribeToResponseOf()
는 클라이언트가 서버로부터 응답을 받을 때 사용하지만,emit
을 사용할 때는 필수적이지 않습니다. 하지만 안전하게 모든 토픽을 미리 연결하고 싶을 때 유용합니다.this.client.emit('topic_name', message_payload)
:topic_name
으로message_payload
를 발행합니다.emit
은 비동기적이며 응답을 기다리지 않습니다.
단계 5: UsersService
구현 (동일)
users-service/src/users/users.service.ts
파일은 이전과 동일하게 유지됩니다.
단계 6: AppModule
에 UsersModule
임포트 (동일)
users-service/src/app.module.ts
는 변경 없이 UsersModule
을 임포트합니다.
알림 서비스 구축
user_created
이벤트를 구독하여 처리하는 서비스입니다.
단계 1: 새 NestJS 프로젝트 생성
nest new notifications-service --skip-install
cd notifications-service
npm install @nestjs/microservices
npm install
단계 2: main.ts
파일 수정 (Kafka 마이크로서비스 서버 설정)
import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
import { join } from 'path';
async function bootstrap() {
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
transport: Transport.KAFKA, // Kafka 전송 방식 사용
options: {
client: {
clientId: 'notification-consumer', // 클라이언트 ID
brokers: ['localhost:9092'], // Kafka 브로커 주소
},
consumer: {
groupId: 'notification-group', // 컨슈머 그룹 ID (필수)
},
},
});
await app.listen();
console.log('Notifications Microservice (Kafka Consumer) is listening.');
}
bootstrap();
transport: Transport.KAFKA
: Kafka 전송 방식을 사용합니다.options.client
: Kafka 클라이언트 설정.options.consumer.groupId
: 필수 항목입니다. Kafka 컨슈머는 반드시 컨슈머 그룹에 속해야 합니다. 동일한 그룹 ID를 가진 컨슈머들은 토픽의 파티션을 나누어 소비합니다.
단계 3: 알림 마이크로서비스 컨트롤러 (Kafka 핸들러) 구현
@EventPattern()
데코레이터를 사용하여 특정 Kafka 토픽에서 발생하는 이벤트를 처리합니다.
import { Controller } from '@nestjs/common';
import { EventPattern, Payload } from '@nestjs/microservices'; // EventPattern, Payload 임포트
import { NotificationsService } from './notifications.service';
interface UserCreatedEvent { // user_created 이벤트의 페이로드 구조 정의
id: number;
name: string;
email: string;
}
@Controller()
export class NotificationsController {
constructor(private readonly notificationsService: NotificationsService) {}
// 'user_created' 토픽의 이벤트를 처리하는 핸들러
@EventPattern('user_created')
async handleUserCreated(@Payload() data: UserCreatedEvent) {
console.log(`Notifications Service: Received 'user_created' event for user: ${JSON.stringify(data)}`);
// 여기에서 실제 알림 로직 (예: 이메일 전송)을 구현합니다.
await this.notificationsService.sendWelcomeEmail(data.email, data.name);
}
}
@EventPattern('topic_name')
: 특정 Kafka 토픽에서 발행된 이벤트를 구독하여 처리합니다.send()
와 달리 응답을 반환할 필요가 없습니다.@Payload()
: 이벤트 메시지의 페이로드(값)를 추출하여 메서드 인자로 주입합니다.
단계 4: 알림 서비스 구현 (이메일 전송 시뮬레이션)
import { Injectable } from '@nestjs/common';
@Injectable()
export class NotificationsService {
async sendWelcomeEmail(email: string, name: string): Promise<void> {
console.log(`Sending welcome email to ${name} (${email})...`);
// 실제 이메일 전송 로직 (예: Nodemailer, SendGrid 등 사용)
await new Promise(resolve => setTimeout(resolve, 1000)); // 1초 지연 시뮬레이션
console.log(`Welcome email sent to ${name}!`);
}
}
단계 5: NotificationsModule
및 AppModule
구성
import { Module } from '@nestjs/common';
import { NotificationsController } from './notifications.controller';
import { NotificationsService } from './notifications.service';
@Module({
controllers: [NotificationsController],
providers: [NotificationsService],
})
export class NotificationsModule {}
import { Module } from '@nestjs/common';
import { NotificationsModule } from './notifications/notifications.module';
@Module({
imports: [NotificationsModule],
controllers: [],
providers: [],
})
export class AppModule {}
실행 및 테스트
Kafka 및 Zookeeper 실행
- Docker Compose 파일이 있는 디렉토리에서:
docker-compose up -d
- 모든 컨테이너가 정상적으로 실행되었는지 확인:
docker-compose ps
Notifications Service (Kafka Consumer) 시작
cd notifications-service
npm run start:dev
- 콘솔에
Notifications Microservice (Kafka Consumer) is listening.
메시지 확인.
Users Service (Kafka Producer) 시작
cd users-service
npm run start:dev
- 콘솔에
Users Service (HTTP Server) is listening on port 3000
메시지 확인.
사용자 생성 API 호출 (Postman 또는 cURL)
POST http://localhost:3000/users
- Headers:
Content-Type: application/json
- Body
{ "name": "David", "email": "david@example.com" }
- Users Service 콘솔:
Users Service: New user created: {"id":3,"name":"David","email":"david@example.com"}
메시지가 즉시 출력됩니다. - Notifications Service 콘솔: 잠시 후 (Kafka 브로커를 거쳐 메시지가 전달된 후),
Notifications Service: Received 'user_created' event for user: {"id":3,"name":"David","email":"david@example.com"}
및Sending welcome email to David (david@example.com)...
와Welcome email sent to David!
메시지가 출력되는 것을 확인할 수 있습니다.
이 과정을 통해 Users
서비스에서 Notifications
서비스로 직접 호출 없이 Kafka를 통해 비동기적으로 이벤트가 전달되고 처리되는 것을 확인했습니다.
Kafka를 활용한 아키텍처는 마이크로서비스 간의 결합도를 낮추고 시스템의 확장성과 복원력을 크게 향상시킵니다. 특히 대규모 이벤트 스트림 처리, 실시간 데이터 파이프라인, 그리고 복잡한 비동기 워크플로우를 구축하는 데 매우 강력한 도구입니다. NestJS는 Kafka 전송 계층을 기본으로 제공하여 이러한 이벤트 기반 아키텍처를 쉽게 구현할 수 있도록 돕습니다.
이제 여러분은 NestJS를 사용하여 REST API와 GraphQL 서버를 구축하는 것을 넘어, TCP, gRPC, Kafka와 같은 다양한 통신 프로토콜을 활용한 마이크로서비스 아키텍처를 설계하고 구현하는 데 필요한 핵심 지식을 갖추게 되었습니다.