icon

Apache Kafka를 활용한 이벤트 기반 아키텍처


 이벤트 기반 아키텍처는 시스템 내의 상태 변화를 이벤트로 캡처하고 전파하는 소프트웨어 설계 패턴입니다.

 이 아키텍처는 높은 확장성, 느슨한 결합, 실시간 데이터 처리 능력을 제공합니다.

 Apache Kafka는 이러한 아키텍처를 구현하는 데 널리 사용되는 분산 스트리밍 플랫폼입니다.

Apache Kafka 핵심 개념

  1. 토픽 : 메시지의 카테고리 또는 피드 이름
  2. 파티션 : 토픽의 분산된 부분집합
  3. 프로듀서 : 토픽에 메시지를 발행하는 애플리케이션
  4. 컨슈머 : 토픽에서 메시지를 읽는 애플리케이션

NestJS에 Kafka 설정

  1. 의존성 설치
npm install kafkajs @nestjs/microservices
  1. Kafka 모듈 설정
import { Module } from '@nestjs/common';
import { ClientsModule, Transport } from '@nestjs/microservices';
 
@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'KAFKA_SERVICE',
        transport: Transport.KAFKA,
        options: {
          client: {
            clientId: 'my-app',
            brokers: ['localhost:9092'],
          },
          consumer: {
            groupId: 'my-group'
          }
        }
      },
    ]),
  ],
})
export class AppModule {}

Kafka 프로듀서 구현

import { Injectable, Inject } from '@nestjs/common';
import { ClientKafka } from '@nestjs/microservices';
 
@Injectable()
export class ProducerService {
  constructor(
    @Inject('KAFKA_SERVICE') private readonly client: ClientKafka,
  ) {}
 
  async onModuleInit() {
    await this.client.connect();
  }
 
  async sendMessage(topic: string, message: any) {
    return this.client.emit(topic, message);
  }
}

 사용 예

@Controller()
export class AppController {
  constructor(private readonly producerService: ProducerService) {}
 
  @Post('send')
  async sendMessage(@Body() body: any) {
    return this.producerService.sendMessage('test-topic', body);
  }
}

Kafka 컨슈머 구현

import { Controller } from '@nestjs/common';
import { MessagePattern, Payload } from '@nestjs/microservices';
 
@Controller()
export class ConsumerController {
  @MessagePattern('test-topic')
  async handleMessage(@Payload() message: any) {
    console.log('Received message:', message);
    // 메시지 처리 로직
  }
}

Kafka 스트림 기반 실시간 데이터 처리

 NestJS에서 Kafka Streams API를 직접 사용할 수는 없지만, 유사한 기능을 구현할 수 있습니다.

import { Injectable } from '@nestjs/common';
import { Subject } from 'rxjs';
import { map, filter } from 'rxjs/operators';
 
@Injectable()
export class StreamProcessorService {
  private dataStream = new Subject<any>();
 
  processData(data: any) {
    this.dataStream.next(data);
  }
 
  getProcessedStream() {
    return this.dataStream.pipe(
      filter(data => data.value > 100),
      map(data => ({ ...data, processed: true }))
    );
  }
}

이벤트 스키마 관리

 Apache Avro를 사용한 스키마 정의

{
  "type": "record",
  "name": "UserCreated",
  "fields": [
    {"name": "id", "type": "string"},
    {"name": "name", "type": "string"},
    {"name": "email", "type": "string"}
  ]
}

 NestJS에서 Avro 스키마 사용

import { SchemaRegistry } from '@kafkajs/confluent-schema-registry'
 
const registry = new SchemaRegistry({ host: 'http://localhost:8081' })
 
async function encodeMessage(message: any) {
  const schemaId = await registry.getLatestSchemaId('UserCreated')
  return registry.encode(schemaId, message)
}
 
async function decodeMessage(message: Buffer) {
  return registry.decode(message)
}

마이크로서비스 간 통신 패턴

  1. 요청-응답 패턴
// 요청 서비스
@Injectable()
export class RequestService {
  constructor(@Inject('KAFKA_SERVICE') private client: ClientKafka) {}
 
  async getData() {
    return this.client.send('get-data', { id: '123' }).toPromise();
  }
}
 
// 응답 서비스
@Controller()
export class ResponseController {
  @MessagePattern('get-data')
  getData(@Payload() message: any) {
    return { data: 'Requested data' };
  }
}
  1. 발행-구독 패턴
// 발행자
@Injectable()
export class PublisherService {
  constructor(@Inject('KAFKA_SERVICE') private client: ClientKafka) {}
 
  publishEvent(event: any) {
    this.client.emit('user-events', event);
  }
}
 
// 구독자
@Controller()
export class SubscriberController {
  @EventPattern('user-events')
  handleUserEvent(@Payload() event: any) {
    console.log('Received event:', event);
  }
}

모니터링, 로깅, 장애 복구

  1. 모니터링 : Prometheus와 Grafana 통합
import { PrometheusExporter } from '@opentelemetry/exporter-prometheus';
 
const exporter = new PrometheusExporter({
  port: 9464,
});
 
// Kafka 메트릭 추적
const messageCounter = exporter.createCounter('kafka_messages', {
  description: 'Count of Kafka messages',
});
 
// 메시지 처리 시 카운터 증가
messageCounter.add(1);
  1. 로깅 : Winston 로거 사용
import { WinstonModule } from 'nest-winston';
import * as winston from 'winston';
 
@Module({
  imports: [
    WinstonModule.forRoot({
      transports: [
        new winston.transports.Console(),
        new winston.transports.File({ filename: 'kafka.log' }),
      ],
    }),
  ],
})
export class AppModule {}
  1. 장애 복구 : 재시도 메커니즘 구현
import { retry, catchError } from 'rxjs/operators';
 
@MessagePattern('test-topic')
handleMessage(@Payload() message: any) {
  return this.processMessage(message).pipe(
    retry(3),
    catchError((error) => {
      console.error('Processing failed', error);
      // 에러 처리 로직
    })
  );
}

Best Practices 및 주의사항

  1. 멱등성 보장 : 메시지 중복 처리에 대비
  2. 파티션 키 전략 : 데이터 분산과 순서 보장을 위한 적절한 파티션 키 선택
  3. 컨슈머 그룹 설계 : 병렬 처리를 위한 효율적인 컨슈머 그룹 구성
  4. 배치 처리 : 성능 향상을 위한 메시지 배치 처리
  5. 오프셋 관리 : 안정적인 메시지 처리를 위한 오프셋 커밋 전략 수립
  6. 에러 처리 : Dead Letter Queue 구현으로 실패한 메시지 관리
  7. 보안 : SSL/TLS 암호화 및 SASL 인증 적용
  8. 모니터링 : 컨슈머 랙, 프로듀서 전송률 등 핵심 메트릭 모니터링
  9. 스키마 진화 : 하위 호환성을 고려한 스키마 버전 관리
  10. 성능 튜닝 : 적절한 파티션 수, 복제 팩터 설정

 Kafka의 강력한 메시징 능력과 NestJS의 모듈화된 구조가 만나 복잡한 분산 시스템을 효과적으로 관리할 수 있게 됩니다.

 프로듀서와 컨슈머의 구현은 NestJS의 마이크로서비스 모듈을 통해 간단하게 이루어질 수 있으며, 이를 통해 서비스 간 느슨한 결합을 달성할 수 있습니다. 실시간 데이터 처리를 위해 Kafka 스트림과 유사한 기능을 RxJS를 활용하여 구현할 수 있습니다.

 이벤트 스키마 관리는 시스템의 안정성과 확장성을 위해 중요합니다. Apache Avro와 같은 스키마 레지스트리를 사용하여 이벤트의 구조를 명확히 정의하고 버전을 관리할 수 있습니다.

 마이크로서비스 간 통신에서 Kafka는 요청-응답 패턴과 발행-구독 패턴을 모두 지원합니다. NestJS의 데코레이터를 활용하면 이러한 패턴을 쉽게 구현할 수 있습니다.