Idempotent 프로듀서, 트랜잭셔널 프로듀서, Exactly-once 시맨틱스, 수동 오프셋 관리, 배치 최적화, Dead Letter Queue 등 프로덕션 수준의 Kafka 활용 패턴을 학습합니다.
네트워크는 본질적으로 불안정합니다. 프로듀서가 메시지를 보내고 브로커가 성공적으로 기록했지만, 응답이 네트워크에서 유실될 수 있습니다. 프로듀서는 실패로 판단하고 재전송하게 되어 동일 메시지가 중복 기록됩니다.
프로듀서 → 브로커: 메시지 전송
브로커: 기록 성공 (offset 42)
브로커 → 프로듀서: 응답 전송 (네트워크에서 유실)
프로듀서: 타임아웃 → 재전송
브로커: 동일 메시지 다시 기록 (offset 43) ← 중복!**Idempotent Producer(멱등 프로듀서)**는 이 문제를 해결합니다. 프로듀서가 각 메시지에 **Producer ID(PID)**와 **Sequence Number(시퀀스 번호)**를 부여하고, 브로커가 이를 확인하여 중복 메시지를 거부합니다.
프로듀서(PID=1) → 브로커: 메시지(seq=0) → 기록
프로듀서(PID=1) → 브로커: 메시지(seq=1) → 기록
프로듀서(PID=1) → 브로커: 메시지(seq=1) → 중복 감지, 거부(DuplicateSequenceException)설정은 단 한 줄입니다.
Properties props = new Properties();
props.put("bootstrap.servers", "kafka:9092");
props.put("enable.idempotence", "true"); // 멱등성 활성화
// acks=all, retries=MAX_INT, max.in.flight.requests.per.connection<=5 자동 설정
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);enable.idempotence=true를 설정하면 acks=all, retries=Integer.MAX_VALUE, max.in.flight.requests.per.connection<=5가 자동으로 적용됩니다. 멱등 프로듀서는 단일 파티션, 단일 세션 범위에서 중복을 방지합니다.
멱등 프로듀서는 단일 파티션, 단일 프로듀서 세션 내에서만 중복을 방지합니다. 다음과 같은 상황에서는 보장되지 않습니다.
이러한 한계를 극복하기 위해 트랜잭셔널 프로듀서가 필요합니다.
스트림 처리에서 흔한 패턴은 "소스 토픽에서 읽고, 처리하고, 결과를 대상 토픽에 쓰는" read-process-write 패턴입니다. 이때 읽기(오프셋 커밋)와 쓰기가 원자적으로 수행되지 않으면, 장애 시 데이터 중복이나 유실이 발생합니다.
1. 소스 토픽에서 메시지 읽기 (offset 10)
2. 처리
3. 결과를 대상 토픽에 쓰기 ← 성공
4. 오프셋 커밋 (offset 11) ← 실패 (장애 발생)
→ 재시작 후 offset 10부터 다시 읽어 중복 처리 발생**Transactional Producer(트랜잭셔널 프로듀서)**는 여러 파티션에 대한 쓰기와 오프셋 커밋을 하나의 원자적 트랜잭션으로 묶습니다.
Properties props = new Properties();
props.put("bootstrap.servers", "kafka:9092");
props.put("transactional.id", "order-processor-1"); // 트랜잭션 ID
props.put("enable.idempotence", "true");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions(); // 트랜잭션 초기화
try {
producer.beginTransaction();
// 여러 토픽/파티션에 쓰기
producer.send(new ProducerRecord<>("orders-processed", key, value));
producer.send(new ProducerRecord<>("notifications", key, notification));
// 소스 토픽의 오프셋도 트랜잭션에 포함
producer.sendOffsetsToTransaction(
offsets, consumerGroupMetadata
);
producer.commitTransaction(); // 모두 성공 또는 모두 실패
} catch (Exception e) {
producer.abortTransaction(); // 롤백
}트랜잭션의 핵심 흐름은 다음과 같습니다.
beginTransaction(): 트랜잭션 시작send() 호출: 메시지를 여러 토픽/파티션에 전송sendOffsetsToTransaction(): 컨슈머 오프셋도 트랜잭션에 포함commitTransaction(): 모든 쓰기를 원자적으로 커밋abortTransaction(): 모든 쓰기를 원자적으로 롤백transactional.id는 프로듀서 인스턴스를 고유하게 식별합니다. 프로듀서가 재시작되어도 같은 transactional.id를 사용하면, 이전에 완료되지 않은 트랜잭션이 자동으로 중단(abort)됩니다. 이를 **zombie fencing(좀비 펜싱)**이라고 하며, 이전 인스턴스가 남긴 불완전한 데이터를 방지합니다.
메시지 전달에는 세 가지 보장 수준이 있습니다.
Kafka에서 Exactly-once는 멱등 프로듀서 + 트랜잭션 + 격리 수준의 조합으로 구현됩니다.
// 컨슈머 측: 커밋된 트랜잭션의 메시지만 읽기
Properties consumerProps = new Properties();
consumerProps.put("isolation.level", "read_committed");
// read_committed: 커밋된 트랜잭션 메시지만 소비
// read_uncommitted(기본): 모든 메시지 소비 (중단된 트랜잭션 포함)전체 흐름을 정리하면 다음과 같습니다.
Kafka의 Exactly-once는 Kafka 내부의 read-process-write 패턴에 한정됩니다. 외부 시스템(데이터베이스, API 등)에 대한 쓰기까지 포함하려면, 외부 시스템 자체가 멱등성을 지원하거나 2PC(Two-Phase Commit) 같은 추가 메커니즘이 필요합니다. 이에 대해서는 10장에서 자세히 다룹니다.
기본적으로 Kafka 컨슈머는 오프셋을 자동으로 커밋합니다(enable.auto.commit=true). 일정 간격(기본 5초)마다 마지막으로 poll()한 오프셋을 커밋합니다.
그러나 자동 커밋은 위험합니다. 메시지를 받았지만 아직 처리하지 않은 상태에서 오프셋이 커밋되면, 장애 시 해당 메시지가 유실됩니다. 반대로 처리는 완료했지만 커밋 전에 장애가 나면 중복 처리가 발생합니다.
Properties props = new Properties();
props.put("enable.auto.commit", "false"); // 자동 커밋 비활성화
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("orders"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// 1. 메시지 처리
processRecord(record);
}
// 2. 처리 완료 후 수동 커밋
consumer.commitSync(); // 동기 커밋 (느리지만 안전)
// consumer.commitAsync(); // 비동기 커밋 (빠르지만 실패 시 재시도 복잡)
}더 세밀한 제어가 필요하면 레코드 단위로 오프셋을 커밋할 수 있습니다.
for (ConsumerRecord<String, String> record : records) {
processRecord(record);
// 처리된 레코드의 다음 오프셋을 커밋
Map<TopicPartition, OffsetAndMetadata> offset = Map.of(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1)
);
consumer.commitSync(offset);
}레코드 단위 커밋은 안전하지만 성능 부담이 큽니다. 실전에서는 배치 단위로 처리하고 배치 완료 후 커밋하는 절충안이 흔합니다. 트랜잭셔널 프로듀서를 사용하면 오프셋 커밋도 트랜잭션에 포함할 수 있어 가장 안전합니다.
프로듀서는 메시지를 즉시 보내지 않고 배치로 묶어 전송합니다. 두 가지 설정이 배치 동작을 제어합니다.
Properties props = new Properties();
// 배치 크기 (바이트) — 이 크기가 차면 즉시 전송
props.put("batch.size", 32768); // 32KB (기본: 16KB)
// 최대 대기 시간 (밀리초) — 배치가 차지 않아도 이 시간이 지나면 전송
props.put("linger.ms", 20); // 20ms (기본: 0)batch.size와 linger.ms의 관계를 이해하는 것이 중요합니다.
linger.ms=0(기본): 메시지가 도착하면 즉시 전송 시도. 배치 효과 미미linger.ms=20: 최대 20ms 대기하며 배치를 채움. 처리량 향상, 지연 20ms 증가batch.size=32KB: 배치가 32KB에 도달하면 대기 시간과 무관하게 즉시 전송// 압축 알고리즘 선택
props.put("compression.type", "lz4");
// none: 압축 안 함 (기본)
// gzip: 높은 압축률, CPU 부하 큼
// snappy: 중간 압축률, 빠른 속도
// lz4: 낮은 압축률, 매우 빠른 속도 (권장)
// zstd: 높은 압축률, lz4보다 약간 느림| 알고리즘 | 압축률 | 압축 속도 | 해제 속도 | 권장 시나리오 |
|---|---|---|---|---|
| lz4 | 중간 | 매우 빠름 | 매우 빠름 | 일반적 권장 |
| zstd | 높음 | 빠름 | 빠름 | 대역폭 절약 우선 |
| snappy | 중간 | 빠름 | 빠름 | 레거시 호환 |
| gzip | 매우 높음 | 느림 | 보통 | 저장 공간 절약 |
컨슈머가 특정 메시지를 처리할 수 없는 상황은 빈번합니다. 잘못된 형식, 스키마 불일치, 비즈니스 규칙 위반 등이 원인입니다. 이런 메시지를 무한 재시도하면 전체 파이프라인이 멈추고, 무시하면 데이터가 유실됩니다.
**Dead Letter Queue(DLQ, 데드 레터 큐)**는 처리 불가 메시지를 별도 토픽으로 격리하는 패턴입니다.
private static final int MAX_RETRIES = 3;
private static final String DLQ_TOPIC = "orders.dlq";
public void processWithDLQ(ConsumerRecord<String, String> record) {
int retryCount = 0;
while (retryCount < MAX_RETRIES) {
try {
processRecord(record);
return; // 성공
} catch (RetriableException e) {
retryCount++;
log.warn("처리 실패, 재시도 {}/{}", retryCount, MAX_RETRIES);
backoff(retryCount); // 지수 백오프
} catch (NonRetriableException e) {
break; // 재시도 불필요한 오류
}
}
// 재시도 한도 초과 또는 재시도 불가 오류 → DLQ로 전송
sendToDLQ(record);
}
private void sendToDLQ(ConsumerRecord<String, String> record) {
ProducerRecord<String, String> dlqRecord = new ProducerRecord<>(
DLQ_TOPIC,
record.key(),
record.value()
);
// 원본 메타데이터를 헤더에 보존
dlqRecord.headers().add("original-topic", record.topic().getBytes());
dlqRecord.headers().add("original-partition",
String.valueOf(record.partition()).getBytes());
dlqRecord.headers().add("original-offset",
String.valueOf(record.offset()).getBytes());
dlqRecord.headers().add("failure-reason", /* 오류 메시지 */);
dlqProducer.send(dlqRecord);
}DLQ에 쌓인 메시지는 다음과 같이 처리합니다.
DLQ 토픽의 명명 규칙은 원본 토픽에 .dlq 접미사를 붙이는 것이 관례입니다. 예: orders 토픽의 DLQ는 orders.dlq입니다. 원본 메타데이터(토픽, 파티션, 오프셋, 실패 사유)를 헤더에 보존하면 추후 디버깅과 재처리에 유용합니다.
이번 장에서는 Kafka 프로듀서와 컨슈머의 고급 패턴을 학습했습니다.
read_committed 격리 수준의 조합입니다.linger.ms, batch.size, compression.type 튜닝으로 처리량을 높입니다.5장에서는 Kafka Connect를 통한 데이터 통합을 다룹니다. Source/Sink 커넥터 아키텍처, JDBC/S3/Elasticsearch 커넥터 활용, 분산 모드 운영, SMT(Single Message Transform)를 통한 데이터 변환까지 Kafka를 다양한 시스템과 연결하는 방법을 학습합니다.
이 글이 도움이 되셨나요?
관련 주제 더 보기
Kafka Connect의 Source/Sink 아키텍처, 주요 커넥터 활용, 분산 모드 운영, SMT를 통한 데이터 변환, 커스텀 커넥터 개발까지 Kafka 기반 데이터 통합 전략을 학습합니다.
Kafka의 핵심 아키텍처를 심층적으로 분석합니다. KRaft 모드, 브로커와 파티션 레플리케이션, 프로듀서 전송 보장, 컨슈머 그룹과 리밸런싱까지 Kafka의 내부를 이해합니다.
Flink의 JobManager/TaskManager 아키텍처, DataStream API, 윈도우와 조인, 상태 관리, Chandy-Lamport 체크포인팅, Flink SQL까지 스트림 처리 엔진의 핵심을 학습합니다.