본문으로 건너뛰기
Kreath Archive
TechProjectsBooksAbout
TechProjectsBooksAbout

내비게이션

  • Tech
  • Projects
  • Books
  • About
  • Tags

카테고리

  • AI / ML
  • 웹 개발
  • 프로그래밍
  • 개발 도구

연결

  • GitHub
  • Email
  • RSS
© 2026 Kreath Archive. All rights reserved.Built with Next.js + MDX
홈TechProjectsBooksAbout
//
  1. 홈
  2. 테크
  3. 5장: Kafka Connect와 데이터 통합
2026년 2월 18일·데이터·

5장: Kafka Connect와 데이터 통합

Kafka Connect의 Source/Sink 아키텍처, 주요 커넥터 활용, 분산 모드 운영, SMT를 통한 데이터 변환, 커스텀 커넥터 개발까지 Kafka 기반 데이터 통합 전략을 학습합니다.

14분638자9개 섹션
streamingdata-engineering
공유
realtime-pipeline5 / 11
1234567891011
이전4장: Kafka 프로듀서와 컨슈머 고급 패턴다음6장: Apache Flink — 스트림 처리 엔진

학습 목표

  • Kafka Connect의 아키텍처와 Source/Sink 커넥터의 역할을 이해합니다.
  • 주요 커넥터(JDBC, S3, Elasticsearch)의 설정과 활용법을 학습합니다.
  • 단독(Standalone) 모드와 분산(Distributed) 모드의 차이를 파악합니다.
  • **SMT(Single Message Transform)**를 통한 경량 데이터 변환을 적용합니다.
  • Confluent Hub 생태계와 커스텀 커넥터 개발의 기본을 학습합니다.

Kafka Connect란

Kafka Connect는 Kafka와 외부 시스템 사이의 데이터 이동을 코딩 없이 처리하는 프레임워크입니다. 데이터베이스, 파일 시스템, 검색 엔진, 클라우드 스토리지 등 다양한 시스템과 Kafka를 연결합니다.

왜 Kafka Connect인가

직접 프로듀서/컨슈머 코드를 작성하여 데이터를 옮길 수도 있습니다. 그러나 이 접근법에는 여러 문제가 있습니다.

  • 반복 작업: 모든 소스/싱크마다 유사한 코드를 새로 작성
  • 오류 처리: 재시도, 오프셋 관리, 장애 복구를 매번 구현
  • 확장성: 부하 증가 시 수동으로 인스턴스 관리
  • 모니터링: 각 커넥션의 상태를 별도로 모니터링

Kafka Connect는 이 모든 것을 프레임워크 수준에서 제공합니다. 사용자는 JSON 설정만으로 커넥터를 배포하고, Connect 프레임워크가 분산 실행, 장애 복구, 오프셋 관리를 자동으로 처리합니다.

아키텍처 개요

  • Source Connector(소스 커넥터): 외부 시스템 -> Kafka. 데이터를 수집하여 Kafka 토픽에 기록
  • Sink Connector(싱크 커넥터): Kafka -> 외부 시스템. 토픽의 데이터를 외부로 전달

커넥터의 구조

Connector, Task, Worker

Kafka Connect의 실행 단위는 세 가지 계층으로 구성됩니다.

Connector(커넥터): 논리적 작업 단위입니다. "PostgreSQL의 orders 테이블을 Kafka에 동기화"와 같은 하나의 데이터 통합 작업을 정의합니다. 커넥터는 직접 데이터를 이동하지 않고, Task를 생성하여 실제 작업을 위임합니다.

Task(태스크): 실제 데이터 이동을 수행하는 실행 단위입니다. 하나의 커넥터가 여러 태스크를 생성하여 병렬 처리할 수 있습니다.

Worker(워커): 커넥터와 태스크가 실행되는 JVM 프로세스입니다. 분산 모드에서는 여러 워커가 클러스터를 구성합니다.

connector-hierarchy
text
Worker 1                    Worker 2
├── Connector: pg-source    ├── Connector: es-sink
│   ├── Task 0             │   ├── Task 0
│   └── Task 1             │   └── Task 1
└── Connector: s3-sink
    └── Task 0

실행 모드

Standalone 모드

단일 워커에서 모든 커넥터와 태스크를 실행합니다. 개발, 테스트, 소규모 환경에 적합합니다.

standalone-start.sh
bash
connect-standalone.sh \
  config/connect-standalone.properties \
  config/jdbc-source.properties \
  config/es-sink.properties

Distributed 모드

여러 워커가 클러스터를 구성하고, 커넥터와 태스크가 워커들에 분산됩니다. 프로덕션 환경에서의 표준 운영 방식입니다.

distributed-start.sh
bash
connect-distributed.sh config/connect-distributed.properties

분산 모드의 주요 특성은 다음과 같습니다.

  • 자동 부하 분산: 태스크가 워커들에 균등하게 분배
  • 장애 복구: 워커 장애 시 해당 태스크가 다른 워커로 자동 이전
  • REST API 관리: JSON 설정을 REST API로 제출하여 커넥터 관리
  • 수평 확장: 워커 추가로 처리 용량 확장
connector-rest-api.sh
bash
# 커넥터 생성 (REST API)
curl -X POST http://connect:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "pg-orders-source",
    "config": {
      "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
      "database.hostname": "postgres",
      "database.port": "5432",
      "database.user": "replicator",
      "database.password": "secret",
      "database.dbname": "shop",
      "table.include.list": "public.orders",
      "topic.prefix": "shop"
    }
  }'
 
# 커넥터 상태 확인
curl http://connect:8083/connectors/pg-orders-source/status
 
# 커넥터 목록 조회
curl http://connect:8083/connectors
 
# 커넥터 삭제
curl -X DELETE http://connect:8083/connectors/pg-orders-source
Tip

분산 모드에서는 커넥터 설정이 Kafka 내부 토픽(connect-configs, connect-offsets, connect-status)에 저장됩니다. 워커가 재시작되어도 설정이 유지되며, 어떤 워커에 REST 요청을 보내든 동일한 결과를 얻습니다.


주요 커넥터

JDBC Source Connector

관계형 데이터베이스의 데이터를 Kafka로 수집합니다. 두 가지 모드를 지원합니다.

Bulk 모드: 주기적으로 전체 테이블을 읽어 Kafka에 적재합니다. 변경 추적이 불가능한 테이블에 사용합니다.

Incremental 모드: 타임스탬프 컬럼이나 자동 증가 ID를 기준으로 변경된 행만 수집합니다.

jdbc-source-config.json
json
{
  "name": "jdbc-orders-source",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url": "jdbc:postgresql://postgres:5432/shop",
    "connection.user": "reader",
    "connection.password": "secret",
    "table.whitelist": "orders,order_items",
    "mode": "timestamp+incrementing",
    "timestamp.column.name": "updated_at",
    "incrementing.column.name": "id",
    "topic.prefix": "jdbc-",
    "poll.interval.ms": "1000",
    "tasks.max": "2"
  }
}
Warning

JDBC Source Connector는 폴링 기반이므로 진정한 실시간이 아닙니다. DELETE 이벤트도 감지하지 못합니다. 진정한 실시간 CDC가 필요하면 8장에서 다루는 Debezium을 사용해야 합니다.

S3 Sink Connector

Kafka 토픽의 데이터를 AWS S3에 적재합니다. 데이터 레이크 구축에 핵심적인 커넥터입니다.

s3-sink-config.json
json
{
  "name": "s3-orders-sink",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "3",
    "topics": "orders",
    "s3.region": "ap-northeast-2",
    "s3.bucket.name": "data-lake-raw",
    "s3.part.size": "5242880",
    "flush.size": "1000",
    "rotate.interval.ms": "60000",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
    "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
    "path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
    "partition.duration.ms": "3600000",
    "locale": "ko_KR",
    "timezone": "Asia/Seoul"
  }
}

핵심 설정 항목을 살펴보면 다음과 같습니다.

  • flush.size: 이 수만큼 레코드가 쌓이면 S3에 파일로 기록
  • rotate.interval.ms: 이 시간이 지나면 레코드 수와 무관하게 기록
  • format.class: 출력 포맷 (Parquet, Avro, JSON 등)
  • partitioner.class: S3 경로 파티셔닝 전략

Elasticsearch Sink Connector

Kafka 토픽의 데이터를 Elasticsearch에 인덱싱합니다. 실시간 검색과 로그 분석에 활용됩니다.

es-sink-config.json
json
{
  "name": "es-orders-sink",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": "2",
    "topics": "orders",
    "connection.url": "http://elasticsearch:9200",
    "type.name": "_doc",
    "key.ignore": "false",
    "schema.ignore": "true",
    "behavior.on.malformed.documents": "warn",
    "write.method": "upsert"
  }
}

SMT (Single Message Transform)

**SMT(Single Message Transform)**는 커넥터가 메시지를 읽거나 쓸 때 경량 변환을 적용하는 기능입니다. 별도의 스트림 처리 엔진 없이 간단한 변환을 인라인으로 수행할 수 있습니다.

주요 SMT 유형

SMT기능
InsertField필드 추가 (타임스탬프, 토픽명 등)
ReplaceField필드 이름 변경, 불필요한 필드 제거
MaskField민감 필드 마스킹
TimestampConverter타임스탬프 형식 변환
ValueToKey값의 특정 필드를 키로 사용
ExtractField중첩 구조에서 특정 필드 추출
Flatten중첩 JSON을 평탄화
RegexRouter토픽 이름을 정규식으로 변환

SMT 적용 예제

smt-example.json
json
{
  "name": "orders-with-transforms",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
 
    "transforms": "route,addTimestamp,maskEmail",
 
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.route.regex": "shop\\.public\\.(.*)",
    "transforms.route.replacement": "processed-$1",
 
    "transforms.addTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
    "transforms.addTimestamp.timestamp.field": "ingested_at",
 
    "transforms.maskEmail.type": "org.apache.kafka.connect.transforms.MaskField$Value",
    "transforms.maskEmail.fields": "email",
    "transforms.maskEmail.replacement": "***MASKED***"
  }
}

이 설정은 세 가지 변환을 순서대로 적용합니다.

  1. route: 토픽 이름을 shop.public.orders에서 processed-orders로 변환
  2. addTimestamp: 각 메시지에 수집 시각(ingested_at) 필드 추가
  3. maskEmail: 이메일 필드를 마스킹 처리
Info

SMT는 경량 변환에 적합합니다. 여러 토픽 간의 조인, 윈도우 집계, 복잡한 비즈니스 로직이 필요하면 Flink나 Kafka Streams와 같은 전용 스트림 처리 엔진을 사용해야 합니다.


Confluent Hub와 커넥터 생태계

Confluent Hub는 Kafka Connect 커넥터의 마켓플레이스입니다. 수백 개의 검증된 커넥터를 제공하며, 주요 데이터베이스, 클라우드 서비스, SaaS 플랫폼용 커넥터를 설치할 수 있습니다.

hub-install.sh
bash
# Confluent Hub에서 커넥터 설치
confluent-hub install debezium/debezium-connector-postgresql:latest
confluent-hub install confluentinc/kafka-connect-s3:latest
confluent-hub install confluentinc/kafka-connect-elasticsearch:latest

커스텀 커넥터 개발

기존 커넥터로 충족할 수 없는 요구사항이 있다면 커스텀 커넥터를 개발할 수 있습니다. Source 커넥터는 SourceConnector와 SourceTask를, Sink 커넥터는 SinkConnector와 SinkTask를 구현합니다.

CustomSourceTask.java
java
public class CustomSourceTask extends SourceTask {
 
    @Override
    public List<SourceRecord> poll() throws InterruptedException {
        // 외부 시스템에서 데이터 조회
        List<Event> events = externalClient.fetchNewEvents(lastOffset);
 
        return events.stream()
            .map(event -> new SourceRecord(
                sourcePartition(),           // 소스 파티션 식별
                sourceOffset(event.getId()), // 오프셋 추적
                "target-topic",              // 대상 Kafka 토픽
                Schema.STRING_SCHEMA,        // 키 스키마
                event.getKey(),              // 키
                eventSchema,                 // 값 스키마
                event.toStruct()             // 값
            ))
            .collect(Collectors.toList());
    }
}
Tip

커스텀 커넥터를 개발하기 전에 Confluent Hub에서 유사한 커넥터가 있는지 먼저 확인하세요. 커넥터 개발에는 오프셋 관리, 재시작 복구, 스키마 처리 등 고려할 사항이 많으며, 이미 검증된 커넥터를 사용하는 것이 대부분의 경우 더 효율적입니다.


정리

이번 장에서는 Kafka Connect를 통한 데이터 통합을 학습했습니다.

  • Kafka Connect: 코딩 없이 JSON 설정만으로 외부 시스템과 Kafka를 연결하는 프레임워크입니다.
  • Source/Sink: 소스 커넥터는 외부에서 Kafka로, 싱크 커넥터는 Kafka에서 외부로 데이터를 이동합니다.
  • 분산 모드: 프로덕션에서는 여러 워커가 클러스터를 구성하여 부하 분산과 장애 복구를 자동 처리합니다.
  • SMT: 경량 변환을 인라인으로 적용하여 토픽 라우팅, 필드 추가/제거, 마스킹 등을 처리합니다.
  • Confluent Hub: 수백 개의 검증된 커넥터를 제공하는 생태계입니다.

다음 장 미리보기

6장에서는 스트림 처리의 핵심 엔진인 Apache Flink를 다룹니다. JobManager/TaskManager 아키텍처, DataStream API, 윈도우와 조인, 상태 관리, Chandy-Lamport 알고리즘 기반 체크포인팅, Flink SQL까지 Flink의 전체 그림을 학습합니다.

이 글이 도움이 되셨나요?

관련 주제 더 보기

#streaming#data-engineering

관련 글

데이터

6장: Apache Flink — 스트림 처리 엔진

Flink의 JobManager/TaskManager 아키텍처, DataStream API, 윈도우와 조인, 상태 관리, Chandy-Lamport 체크포인팅, Flink SQL까지 스트림 처리 엔진의 핵심을 학습합니다.

2026년 2월 20일·19분
데이터

4장: Kafka 프로듀서와 컨슈머 고급 패턴

Idempotent 프로듀서, 트랜잭셔널 프로듀서, Exactly-once 시맨틱스, 수동 오프셋 관리, 배치 최적화, Dead Letter Queue 등 프로덕션 수준의 Kafka 활용 패턴을 학습합니다.

2026년 2월 16일·18분
데이터

7장: Spark Structured Streaming

Spark Structured Streaming의 마이크로배치와 연속 처리 모드, DataFrame API 기반 스트리밍, 윈도우와 워터마크, Kafka 소스/싱크 연동, Flink와의 비교를 학습합니다.

2026년 2월 22일·14분
이전 글4장: Kafka 프로듀서와 컨슈머 고급 패턴
다음 글6장: Apache Flink — 스트림 처리 엔진

댓글

목차

약 14분 남음
  • 학습 목표
  • Kafka Connect란
    • 왜 Kafka Connect인가
    • 아키텍처 개요
  • 커넥터의 구조
    • Connector, Task, Worker
  • 실행 모드
    • Standalone 모드
    • Distributed 모드
  • 주요 커넥터
    • JDBC Source Connector
    • S3 Sink Connector
    • Elasticsearch Sink Connector
  • SMT (Single Message Transform)
    • 주요 SMT 유형
    • SMT 적용 예제
  • Confluent Hub와 커넥터 생태계
    • 커스텀 커넥터 개발
  • 정리
  • 다음 장 미리보기