Apache Kafka를 활용한 이벤트 기반 아키텍처
이벤트 기반 아키텍처는 시스템 내의 상태 변화를 이벤트로 캡처하고 전파하는 소프트웨어 설계 패턴입니다.
이 아키텍처는 높은 확장성, 느슨한 결합, 실시간 데이터 처리 능력을 제공합니다.
Apache Kafka는 이러한 아키텍처를 구현하는 데 널리 사용되는 분산 스트리밍 플랫폼입니다.
Apache Kafka 핵심 개념
- 토픽 : 메시지의 카테고리 또는 피드 이름
- 파티션 : 토픽의 분산된 부분집합
- 프로듀서 : 토픽에 메시지를 발행하는 애플리케이션
- 컨슈머 : 토픽에서 메시지를 읽는 애플리케이션
NestJS에 Kafka 설정
- 의존성 설치
npm install kafkajs @nestjs/microservices
- Kafka 모듈 설정
import { Module } from '@nestjs/common';
import { ClientsModule, Transport } from '@nestjs/microservices';
@Module({
imports: [
ClientsModule.register([
{
name: 'KAFKA_SERVICE',
transport: Transport.KAFKA,
options: {
client: {
clientId: 'my-app',
brokers: ['localhost:9092'],
},
consumer: {
groupId: 'my-group'
}
}
},
]),
],
})
export class AppModule {}
Kafka 프로듀서 구현
import { Injectable, Inject } from '@nestjs/common';
import { ClientKafka } from '@nestjs/microservices';
@Injectable()
export class ProducerService {
constructor(
@Inject('KAFKA_SERVICE') private readonly client: ClientKafka,
) {}
async onModuleInit() {
await this.client.connect();
}
async sendMessage(topic: string, message: any) {
return this.client.emit(topic, message);
}
}
사용 예
@Controller()
export class AppController {
constructor(private readonly producerService: ProducerService) {}
@Post('send')
async sendMessage(@Body() body: any) {
return this.producerService.sendMessage('test-topic', body);
}
}
Kafka 컨슈머 구현
import { Controller } from '@nestjs/common';
import { MessagePattern, Payload } from '@nestjs/microservices';
@Controller()
export class ConsumerController {
@MessagePattern('test-topic')
async handleMessage(@Payload() message: any) {
console.log('Received message:', message);
// 메시지 처리 로직
}
}
Kafka 스트림 기반 실시간 데이터 처리
NestJS에서 Kafka Streams API를 직접 사용할 수는 없지만, 유사한 기능을 구현할 수 있습니다.
import { Injectable } from '@nestjs/common';
import { Subject } from 'rxjs';
import { map, filter } from 'rxjs/operators';
@Injectable()
export class StreamProcessorService {
private dataStream = new Subject<any>();
processData(data: any) {
this.dataStream.next(data);
}
getProcessedStream() {
return this.dataStream.pipe(
filter(data => data.value > 100),
map(data => ({ ...data, processed: true }))
);
}
}
이벤트 스키마 관리
Apache Avro를 사용한 스키마 정의
{
"type": "record",
"name": "UserCreated",
"fields": [
{"name": "id", "type": "string"},
{"name": "name", "type": "string"},
{"name": "email", "type": "string"}
]
}
NestJS에서 Avro 스키마 사용
import { SchemaRegistry } from '@kafkajs/confluent-schema-registry'
const registry = new SchemaRegistry({ host: 'http://localhost:8081' })
async function encodeMessage(message: any) {
const schemaId = await registry.getLatestSchemaId('UserCreated')
return registry.encode(schemaId, message)
}
async function decodeMessage(message: Buffer) {
return registry.decode(message)
}
마이크로서비스 간 통신 패턴
- 요청-응답 패턴
// 요청 서비스
@Injectable()
export class RequestService {
constructor(@Inject('KAFKA_SERVICE') private client: ClientKafka) {}
async getData() {
return this.client.send('get-data', { id: '123' }).toPromise();
}
}
// 응답 서비스
@Controller()
export class ResponseController {
@MessagePattern('get-data')
getData(@Payload() message: any) {
return { data: 'Requested data' };
}
}
- 발행-구독 패턴
// 발행자
@Injectable()
export class PublisherService {
constructor(@Inject('KAFKA_SERVICE') private client: ClientKafka) {}
publishEvent(event: any) {
this.client.emit('user-events', event);
}
}
// 구독자
@Controller()
export class SubscriberController {
@EventPattern('user-events')
handleUserEvent(@Payload() event: any) {
console.log('Received event:', event);
}
}
모니터링, 로깅, 장애 복구
- 모니터링 : Prometheus와 Grafana 통합
import { PrometheusExporter } from '@opentelemetry/exporter-prometheus';
const exporter = new PrometheusExporter({
port: 9464,
});
// Kafka 메트릭 추적
const messageCounter = exporter.createCounter('kafka_messages', {
description: 'Count of Kafka messages',
});
// 메시지 처리 시 카운터 증가
messageCounter.add(1);
- 로깅 : Winston 로거 사용
import { WinstonModule } from 'nest-winston';
import * as winston from 'winston';
@Module({
imports: [
WinstonModule.forRoot({
transports: [
new winston.transports.Console(),
new winston.transports.File({ filename: 'kafka.log' }),
],
}),
],
})
export class AppModule {}
- 장애 복구 : 재시도 메커니즘 구현
import { retry, catchError } from 'rxjs/operators';
@MessagePattern('test-topic')
handleMessage(@Payload() message: any) {
return this.processMessage(message).pipe(
retry(3),
catchError((error) => {
console.error('Processing failed', error);
// 에러 처리 로직
})
);
}
Best Practices 및 주의사항
- 멱등성 보장 : 메시지 중복 처리에 대비
- 파티션 키 전략 : 데이터 분산과 순서 보장을 위한 적절한 파티션 키 선택
- 컨슈머 그룹 설계 : 병렬 처리를 위한 효율적인 컨슈머 그룹 구성
- 배치 처리 : 성능 향상을 위한 메시지 배치 처리
- 오프셋 관리 : 안정적인 메시지 처리를 위한 오프셋 커밋 전략 수립
- 에러 처리 : Dead Letter Queue 구현으로 실패한 메시지 관리
- 보안 : SSL/TLS 암호화 및 SASL 인증 적용
- 모니터링 : 컨슈머 랙, 프로듀서 전송률 등 핵심 메트릭 모니터링
- 스키마 진화 : 하위 호환성을 고려한 스키마 버전 관리
- 성능 튜닝 : 적절한 파티션 수, 복제 팩터 설정
Kafka의 강력한 메시징 능력과 NestJS의 모듈화된 구조가 만나 복잡한 분산 시스템을 효과적으로 관리할 수 있게 됩니다.
프로듀서와 컨슈머의 구현은 NestJS의 마이크로서비스 모듈을 통해 간단하게 이루어질 수 있으며, 이를 통해 서비스 간 느슨한 결합을 달성할 수 있습니다. 실시간 데이터 처리를 위해 Kafka 스트림과 유사한 기능을 RxJS를 활용하여 구현할 수 있습니다.
이벤트 스키마 관리는 시스템의 안정성과 확장성을 위해 중요합니다. Apache Avro와 같은 스키마 레지스트리를 사용하여 이벤트의 구조를 명확히 정의하고 버전을 관리할 수 있습니다.
마이크로서비스 간 통신에서 Kafka는 요청-응답 패턴과 발행-구독 패턴을 모두 지원합니다. NestJS의 데코레이터를 활용하면 이러한 패턴을 쉽게 구현할 수 있습니다.