Flink의 JobManager/TaskManager 아키텍처, DataStream API, 윈도우와 조인, 상태 관리, Chandy-Lamport 체크포인팅, Flink SQL까지 스트림 처리 엔진의 핵심을 학습합니다.
Apache Flink는 분산 스트림 처리 엔진입니다. 무한한 이벤트 스트림을 낮은 지연 시간으로 처리하면서도, 정확한 상태 관리와 장애 복구를 보장합니다.
Flink의 핵심 철학은 **"스트림이 기본, 배치는 스트림의 특수 사례"**입니다. 배치 처리를 유한한(bounded) 스트림으로 취급하여, 스트리밍과 배치를 단일 엔진으로 통합합니다.
Flink 클러스터는 두 종류의 프로세스로 구성됩니다.
JobManager(잡 매니저): 클러스터의 마스터 노드입니다.
TaskManager(태스크 매니저): 실제 데이터 처리를 수행하는 워커 노드입니다.
Flink 작업은 DAG(Directed Acyclic Graph, 유향 비순환 그래프) 형태로 실행됩니다.
각 노드(operator)는 설정된 병렬도에 따라 여러 인스턴스로 실행되고, TaskManager의 슬롯에 분배됩니다.
DataStream API는 Flink의 핵심 프로그래밍 인터페이스입니다. Java와 Python 예제를 함께 살펴보겠습니다.
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
public class OrderAnalytics {
public static void main(String[] args) throws Exception {
// 1. 실행 환경 생성
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 소스에서 데이터 읽기
DataStream<String> rawStream = env.addSource(
new FlinkKafkaConsumer<>("orders", new SimpleStringSchema(), kafkaProps)
);
// 3. 변환 처리
DataStream<OrderEvent> orders = rawStream
.map(json -> OrderEvent.fromJson(json))
.filter(order -> order.getAmount() > 0);
// 4. 키 기반 집계
DataStream<OrderSummary> summary = orders
.keyBy(order -> order.getUserId())
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.aggregate(new OrderAggregator());
// 5. 싱크에 결과 쓰기
summary.addSink(new ElasticsearchSink<>(esConfig));
// 6. 실행
env.execute("Order Analytics Job");
}
}from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.window import TumblingEventTimeWindows
from pyflink.common import Time
# 1. 실행 환경 생성
env = StreamExecutionEnvironment.get_execution_environment()
# 2. 소스에서 데이터 읽기
raw_stream = env.add_source(kafka_source)
# 3. 변환 처리
orders = (raw_stream
.map(parse_order)
.filter(lambda order: order.amount > 0))
# 4. 키 기반 집계
summary = (orders
.key_by(lambda order: order.user_id)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.aggregate(OrderAggregator()))
# 5. 싱크에 결과 쓰기
summary.add_sink(es_sink)
# 6. 실행
env.execute("Order Analytics Job")| 연산 | 설명 | 예시 |
|---|---|---|
map | 1:1 변환 | JSON 파싱, 단위 변환 |
flatMap | 1:N 변환 | 한 이벤트에서 여러 이벤트 생성 |
filter | 조건부 필터링 | 유효하지 않은 이벤트 제거 |
keyBy | 키 기반 파티셔닝 | 사용자별, 지역별 그룹화 |
window | 윈도우 할당 | 5분 텀블링, 10분 슬라이딩 |
reduce | 윈도우 내 축약 | 합계, 최댓값, 최솟값 |
aggregate | 윈도우 내 집계 | 복잡한 집계 로직 |
union | 스트림 합치기 | 여러 소스 통합 |
connect | 두 스트림 연결 | 서로 다른 타입의 스트림 조합 |
2장에서 학습한 윈도우 개념을 Flink에서 구현하는 방법입니다.
// 텀블링 윈도우: 5분 단위 매출 집계
orders
.keyBy(order -> order.getCategory())
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.sum("amount");
// 슬라이딩 윈도우: 10분 윈도우를 1분 간격으로 슬라이드
orders
.keyBy(order -> order.getCategory())
.window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(1)))
.sum("amount");
// 세션 윈도우: 30분 비활동 시 세션 종료
userActions
.keyBy(action -> action.getUserId())
.window(EventTimeSessionWindows.withGap(Time.minutes(30)))
.process(new SessionAnalyzer());두 개의 스트림을 조인하는 것은 배치 조인과 다릅니다. 양쪽 스트림의 데이터가 서로 다른 시점에 도착할 수 있으므로, 윈도우 기반 조인을 사용합니다.
// 윈도우 조인: 주문과 결제를 5분 윈도우 내에서 조인
DataStream<EnrichedOrder> enriched = orders
.join(payments)
.where(order -> order.getOrderId())
.equalTo(payment -> payment.getOrderId())
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.apply((order, payment) -> new EnrichedOrder(order, payment));**Interval Join(인터벌 조인)**은 더 유연한 시간 범위를 지정할 수 있습니다.
// 주문 후 1시간 이내의 결제만 조인
orders
.keyBy(order -> order.getOrderId())
.intervalJoin(payments.keyBy(payment -> payment.getOrderId()))
.between(Time.minutes(0), Time.hours(1))
.process(new OrderPaymentJoiner());조인 시 윈도우 크기를 적절히 설정해야 합니다. 너무 작으면 늦게 도착한 이벤트가 조인되지 못하고, 너무 크면 메모리 사용량이 증가합니다. 비즈니스 요구사항에 맞는 시간 범위를 설정하세요.
Flink의 가장 강력한 특성 중 하나는 **Stateful Processing(상태 기반 처리)**입니다. 단순한 이벤트 단위 변환을 넘어, 이전 이벤트의 정보를 기억하고 활용할 수 있습니다.
상태가 필요한 대표적인 연산은 다음과 같습니다.
public class FraudDetector extends KeyedProcessFunction<String, Transaction, Alert> {
// ValueState: 단일 값 저장
private ValueState<Boolean> flagState;
// ListState: 값 목록 저장
private ListState<Transaction> recentTransactions;
// MapState: 키-값 매핑 저장
private MapState<String, Integer> merchantCounts;
@Override
public void open(Configuration parameters) {
flagState = getRuntimeContext().getState(
new ValueStateDescriptor<>("flag", Boolean.class));
recentTransactions = getRuntimeContext().getListState(
new ListStateDescriptor<>("recent", Transaction.class));
merchantCounts = getRuntimeContext().getMapState(
new MapStateDescriptor<>("merchants", String.class, Integer.class));
}
@Override
public void processElement(Transaction tx, Context ctx, Collector<Alert> out) {
// 상태를 활용한 사기 탐지 로직
Boolean flagged = flagState.value();
if (flagged != null && flagged) {
if (tx.getAmount() > 500) {
out.collect(new Alert(tx.getUserId(), "고액 거래 감지"));
}
}
if (tx.getAmount() < 1) {
flagState.update(true); // 소액 테스트 거래 감지
// 다음 이벤트에서 검증
ctx.timerService().registerEventTimeTimer(
ctx.timestamp() + 60_000); // 1분 후 플래그 초기화
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) {
flagState.clear(); // 타이머 만료 시 플래그 초기화
}
}Flink는 상태를 저장하는 두 가지 백엔드를 제공합니다.
HashMapStateBackend: 상태를 JVM 힙 메모리에 저장합니다. 빠르지만 상태 크기가 메모리에 제한됩니다.
EmbeddedRocksDBStateBackend: 상태를 RocksDB(임베디드 키-값 저장소)에 저장합니다. 디스크를 활용하므로 매우 큰 상태를 관리할 수 있지만, 메모리 백엔드보다 느립니다.
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// 대규모 상태가 필요한 경우 RocksDB 사용
env.setStateBackend(new EmbeddedRocksDBStateBackend());상태 크기가 수 GB 이하이고 지연 시간이 중요한 경우 HashMapStateBackend, 상태가 수십 GB 이상으로 커질 수 있는 경우 EmbeddedRocksDBStateBackend를 선택합니다. 프로덕션 환경에서는 RocksDB가 안전한 선택입니다.
Flink 작업이 장애로 중단되면, 마지막 체크포인트로부터 복구해야 합니다. 그런데 분산 환경에서 "일관된 스냅샷"을 찍는 것은 간단하지 않습니다. 여러 태스크가 동시에 다른 이벤트를 처리하고 있으므로, 단순히 각 태스크의 상태를 따로 저장하면 일관성이 깨질 수 있습니다.
Flink는 Chandy-Lamport 알고리즘의 변형을 사용하여 **분산 일관 스냅샷(distributed consistent snapshot)**을 구현합니다.
핵심 아이디어는 **Barrier(배리어)**입니다. 데이터 스트림 사이에 특수한 마커(배리어)를 삽입하여, 모든 오퍼레이터가 동일한 논리적 시점의 상태를 저장하도록 합니다.
동작 과정을 단계별로 살펴보면 다음과 같습니다.
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// 체크포인트 활성화: 30초 간격
env.enableCheckpointing(30_000);
// Exactly-once 보장 (기본값)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 체크포인트 간 최소 간격: 10초
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10_000);
// 체크포인트 타임아웃: 60초
env.getCheckpointConfig().setCheckpointTimeout(60_000);
// 체크포인트 저장소 (HDFS, S3 등)
env.getCheckpointConfig().setCheckpointStorage("s3://flink-checkpoints/");체크포인트 간격은 장애 복구 시간과 직결됩니다. 간격이 짧을수록 복구 시 재처리할 데이터가 적지만, 체크포인트 자체의 오버헤드가 커집니다. 일반적으로 10초~5분 사이에서 설정하며, 상태 크기와 처리량을 고려하여 조정합니다.
Flink SQL은 SQL 문법으로 스트림 처리를 정의하는 선언적 인터페이스입니다. DataStream API보다 간결하며, SQL에 익숙한 분석가도 스트림 처리를 작성할 수 있습니다.
-- 1. Kafka 소스 테이블 정의
CREATE TABLE orders (
order_id STRING,
user_id STRING,
amount DECIMAL(10, 2),
category STRING,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'flink-sql-orders',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
);
-- 2. 5분 단위 카테고리별 매출 집계
CREATE TABLE category_revenue (
category STRING,
window_start TIMESTAMP(3),
window_end TIMESTAMP(3),
total_revenue DECIMAL(10, 2),
order_count BIGINT,
PRIMARY KEY (category, window_start) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://elasticsearch:9200',
'index' = 'category-revenue'
);
INSERT INTO category_revenue
SELECT
category,
window_start,
window_end,
SUM(amount) AS total_revenue,
COUNT(*) AS order_count
FROM TABLE(
TUMBLE(TABLE orders, DESCRIPTOR(order_time), INTERVAL '5' MINUTE)
)
GROUP BY category, window_start, window_end;Flink SQL은 Confluent Cloud에서도 관리형 서비스로 제공됩니다. 인프라 관리 없이 SQL 문만 작성하면 실시간 스트림 처리 파이프라인이 배포됩니다.
이번 장에서는 Apache Flink의 핵심 기능을 학습했습니다.
7장에서는 또 다른 주요 스트림 처리 엔진인 Spark Structured Streaming을 다룹니다. 마이크로배치와 연속 처리 모드, DataFrame API 기반 스트리밍, Flink와의 비교 분석을 통해 두 엔진의 적합한 활용 시나리오를 판단하는 기준을 세웁니다.
이 글이 도움이 되셨나요?
관련 주제 더 보기
Spark Structured Streaming의 마이크로배치와 연속 처리 모드, DataFrame API 기반 스트리밍, 윈도우와 워터마크, Kafka 소스/싱크 연동, Flink와의 비교를 학습합니다.
Kafka Connect의 Source/Sink 아키텍처, 주요 커넥터 활용, 분산 모드 운영, SMT를 통한 데이터 변환, 커스텀 커넥터 개발까지 Kafka 기반 데이터 통합 전략을 학습합니다.
CDC의 원리와 WAL 기반 변경 캡처, Debezium 아키텍처, PostgreSQL/MySQL CDC 실습, Flink CDC 3.6, 아웃박스 패턴, 이벤추얼 컨시스턴시까지 데이터 통합의 핵심을 학습합니다.