Kafka Connect의 Source/Sink 아키텍처, 주요 커넥터 활용, 분산 모드 운영, SMT를 통한 데이터 변환, 커스텀 커넥터 개발까지 Kafka 기반 데이터 통합 전략을 학습합니다.
Kafka Connect는 Kafka와 외부 시스템 사이의 데이터 이동을 코딩 없이 처리하는 프레임워크입니다. 데이터베이스, 파일 시스템, 검색 엔진, 클라우드 스토리지 등 다양한 시스템과 Kafka를 연결합니다.
직접 프로듀서/컨슈머 코드를 작성하여 데이터를 옮길 수도 있습니다. 그러나 이 접근법에는 여러 문제가 있습니다.
Kafka Connect는 이 모든 것을 프레임워크 수준에서 제공합니다. 사용자는 JSON 설정만으로 커넥터를 배포하고, Connect 프레임워크가 분산 실행, 장애 복구, 오프셋 관리를 자동으로 처리합니다.
Kafka Connect의 실행 단위는 세 가지 계층으로 구성됩니다.
Connector(커넥터): 논리적 작업 단위입니다. "PostgreSQL의 orders 테이블을 Kafka에 동기화"와 같은 하나의 데이터 통합 작업을 정의합니다. 커넥터는 직접 데이터를 이동하지 않고, Task를 생성하여 실제 작업을 위임합니다.
Task(태스크): 실제 데이터 이동을 수행하는 실행 단위입니다. 하나의 커넥터가 여러 태스크를 생성하여 병렬 처리할 수 있습니다.
Worker(워커): 커넥터와 태스크가 실행되는 JVM 프로세스입니다. 분산 모드에서는 여러 워커가 클러스터를 구성합니다.
Worker 1 Worker 2
├── Connector: pg-source ├── Connector: es-sink
│ ├── Task 0 │ ├── Task 0
│ └── Task 1 │ └── Task 1
└── Connector: s3-sink
└── Task 0단일 워커에서 모든 커넥터와 태스크를 실행합니다. 개발, 테스트, 소규모 환경에 적합합니다.
connect-standalone.sh \
config/connect-standalone.properties \
config/jdbc-source.properties \
config/es-sink.properties여러 워커가 클러스터를 구성하고, 커넥터와 태스크가 워커들에 분산됩니다. 프로덕션 환경에서의 표준 운영 방식입니다.
connect-distributed.sh config/connect-distributed.properties분산 모드의 주요 특성은 다음과 같습니다.
# 커넥터 생성 (REST API)
curl -X POST http://connect:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "pg-orders-source",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "replicator",
"database.password": "secret",
"database.dbname": "shop",
"table.include.list": "public.orders",
"topic.prefix": "shop"
}
}'
# 커넥터 상태 확인
curl http://connect:8083/connectors/pg-orders-source/status
# 커넥터 목록 조회
curl http://connect:8083/connectors
# 커넥터 삭제
curl -X DELETE http://connect:8083/connectors/pg-orders-source분산 모드에서는 커넥터 설정이 Kafka 내부 토픽(connect-configs, connect-offsets, connect-status)에 저장됩니다. 워커가 재시작되어도 설정이 유지되며, 어떤 워커에 REST 요청을 보내든 동일한 결과를 얻습니다.
관계형 데이터베이스의 데이터를 Kafka로 수집합니다. 두 가지 모드를 지원합니다.
Bulk 모드: 주기적으로 전체 테이블을 읽어 Kafka에 적재합니다. 변경 추적이 불가능한 테이블에 사용합니다.
Incremental 모드: 타임스탬프 컬럼이나 자동 증가 ID를 기준으로 변경된 행만 수집합니다.
{
"name": "jdbc-orders-source",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://postgres:5432/shop",
"connection.user": "reader",
"connection.password": "secret",
"table.whitelist": "orders,order_items",
"mode": "timestamp+incrementing",
"timestamp.column.name": "updated_at",
"incrementing.column.name": "id",
"topic.prefix": "jdbc-",
"poll.interval.ms": "1000",
"tasks.max": "2"
}
}JDBC Source Connector는 폴링 기반이므로 진정한 실시간이 아닙니다. DELETE 이벤트도 감지하지 못합니다. 진정한 실시간 CDC가 필요하면 8장에서 다루는 Debezium을 사용해야 합니다.
Kafka 토픽의 데이터를 AWS S3에 적재합니다. 데이터 레이크 구축에 핵심적인 커넥터입니다.
{
"name": "s3-orders-sink",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "3",
"topics": "orders",
"s3.region": "ap-northeast-2",
"s3.bucket.name": "data-lake-raw",
"s3.part.size": "5242880",
"flush.size": "1000",
"rotate.interval.ms": "60000",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
"partition.duration.ms": "3600000",
"locale": "ko_KR",
"timezone": "Asia/Seoul"
}
}핵심 설정 항목을 살펴보면 다음과 같습니다.
flush.size: 이 수만큼 레코드가 쌓이면 S3에 파일로 기록rotate.interval.ms: 이 시간이 지나면 레코드 수와 무관하게 기록format.class: 출력 포맷 (Parquet, Avro, JSON 등)partitioner.class: S3 경로 파티셔닝 전략Kafka 토픽의 데이터를 Elasticsearch에 인덱싱합니다. 실시간 검색과 로그 분석에 활용됩니다.
{
"name": "es-orders-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "2",
"topics": "orders",
"connection.url": "http://elasticsearch:9200",
"type.name": "_doc",
"key.ignore": "false",
"schema.ignore": "true",
"behavior.on.malformed.documents": "warn",
"write.method": "upsert"
}
}**SMT(Single Message Transform)**는 커넥터가 메시지를 읽거나 쓸 때 경량 변환을 적용하는 기능입니다. 별도의 스트림 처리 엔진 없이 간단한 변환을 인라인으로 수행할 수 있습니다.
| SMT | 기능 |
|---|---|
InsertField | 필드 추가 (타임스탬프, 토픽명 등) |
ReplaceField | 필드 이름 변경, 불필요한 필드 제거 |
MaskField | 민감 필드 마스킹 |
TimestampConverter | 타임스탬프 형식 변환 |
ValueToKey | 값의 특정 필드를 키로 사용 |
ExtractField | 중첩 구조에서 특정 필드 추출 |
Flatten | 중첩 JSON을 평탄화 |
RegexRouter | 토픽 이름을 정규식으로 변환 |
{
"name": "orders-with-transforms",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"transforms": "route,addTimestamp,maskEmail",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "shop\\.public\\.(.*)",
"transforms.route.replacement": "processed-$1",
"transforms.addTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.addTimestamp.timestamp.field": "ingested_at",
"transforms.maskEmail.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.maskEmail.fields": "email",
"transforms.maskEmail.replacement": "***MASKED***"
}
}이 설정은 세 가지 변환을 순서대로 적용합니다.
shop.public.orders에서 processed-orders로 변환ingested_at) 필드 추가SMT는 경량 변환에 적합합니다. 여러 토픽 간의 조인, 윈도우 집계, 복잡한 비즈니스 로직이 필요하면 Flink나 Kafka Streams와 같은 전용 스트림 처리 엔진을 사용해야 합니다.
Confluent Hub는 Kafka Connect 커넥터의 마켓플레이스입니다. 수백 개의 검증된 커넥터를 제공하며, 주요 데이터베이스, 클라우드 서비스, SaaS 플랫폼용 커넥터를 설치할 수 있습니다.
# Confluent Hub에서 커넥터 설치
confluent-hub install debezium/debezium-connector-postgresql:latest
confluent-hub install confluentinc/kafka-connect-s3:latest
confluent-hub install confluentinc/kafka-connect-elasticsearch:latest기존 커넥터로 충족할 수 없는 요구사항이 있다면 커스텀 커넥터를 개발할 수 있습니다. Source 커넥터는 SourceConnector와 SourceTask를, Sink 커넥터는 SinkConnector와 SinkTask를 구현합니다.
public class CustomSourceTask extends SourceTask {
@Override
public List<SourceRecord> poll() throws InterruptedException {
// 외부 시스템에서 데이터 조회
List<Event> events = externalClient.fetchNewEvents(lastOffset);
return events.stream()
.map(event -> new SourceRecord(
sourcePartition(), // 소스 파티션 식별
sourceOffset(event.getId()), // 오프셋 추적
"target-topic", // 대상 Kafka 토픽
Schema.STRING_SCHEMA, // 키 스키마
event.getKey(), // 키
eventSchema, // 값 스키마
event.toStruct() // 값
))
.collect(Collectors.toList());
}
}커스텀 커넥터를 개발하기 전에 Confluent Hub에서 유사한 커넥터가 있는지 먼저 확인하세요. 커넥터 개발에는 오프셋 관리, 재시작 복구, 스키마 처리 등 고려할 사항이 많으며, 이미 검증된 커넥터를 사용하는 것이 대부분의 경우 더 효율적입니다.
이번 장에서는 Kafka Connect를 통한 데이터 통합을 학습했습니다.
6장에서는 스트림 처리의 핵심 엔진인 Apache Flink를 다룹니다. JobManager/TaskManager 아키텍처, DataStream API, 윈도우와 조인, 상태 관리, Chandy-Lamport 알고리즘 기반 체크포인팅, Flink SQL까지 Flink의 전체 그림을 학습합니다.
이 글이 도움이 되셨나요?
관련 주제 더 보기
Flink의 JobManager/TaskManager 아키텍처, DataStream API, 윈도우와 조인, 상태 관리, Chandy-Lamport 체크포인팅, Flink SQL까지 스트림 처리 엔진의 핵심을 학습합니다.
Idempotent 프로듀서, 트랜잭셔널 프로듀서, Exactly-once 시맨틱스, 수동 오프셋 관리, 배치 최적화, Dead Letter Queue 등 프로덕션 수준의 Kafka 활용 패턴을 학습합니다.
Spark Structured Streaming의 마이크로배치와 연속 처리 모드, DataFrame API 기반 스트리밍, 윈도우와 워터마크, Kafka 소스/싱크 연동, Flink와의 비교를 학습합니다.