본문으로 건너뛰기
Kreath Archive
TechProjectsBooksAbout
TechProjectsBooksAbout

내비게이션

  • Tech
  • Projects
  • Books
  • About
  • Tags

카테고리

  • AI / ML
  • 웹 개발
  • 프로그래밍
  • 개발 도구

연결

  • GitHub
  • Email
  • RSS
© 2026 Kreath Archive. All rights reserved.Built with Next.js + MDX
홈TechProjectsBooksAbout
//
  1. 홈
  2. 테크
  3. 7장: Spark Structured Streaming
2026년 2월 22일·데이터·

7장: Spark Structured Streaming

Spark Structured Streaming의 마이크로배치와 연속 처리 모드, DataFrame API 기반 스트리밍, 윈도우와 워터마크, Kafka 소스/싱크 연동, Flink와의 비교를 학습합니다.

14분559자10개 섹션
streamingdata-engineering
공유
realtime-pipeline7 / 11
1234567891011
이전6장: Apache Flink — 스트림 처리 엔진다음8장: CDC(Change Data Capture)

학습 목표

  • Spark Structured Streaming의 핵심 개념과 실행 모델을 이해합니다.
  • **Micro-batch(마이크로배치)**와 Continuous Processing(연속 처리) 모드의 차이를 파악합니다.
  • DataFrame API를 사용한 스트리밍 프로그래밍을 학습합니다.
  • 윈도우, 워터마크, foreachBatch 패턴을 적용합니다.
  • Kafka 소스와 싱크 연동 방법을 학습합니다.
  • Flink와 Spark의 스트리밍 특성을 비교 분석합니다.

Spark Structured Streaming이란

Spark Structured Streaming은 Apache Spark의 스트림 처리 엔진입니다. Spark의 DataFrame/Dataset API를 스트리밍에 확장한 것으로, 배치 처리와 동일한 API로 스트림 처리를 작성할 수 있습니다.

핵심 아이디어는 **"스트리밍 데이터를 무한히 추가되는 테이블로 취급"**하는 것입니다. 새 데이터가 도착하면 입력 테이블에 행이 추가되고, 쿼리가 증분적으로 실행되어 결과 테이블이 갱신됩니다.

structured-streaming-model
text
입력 테이블 (끊임없이 행 추가)
┌─────────┬────────┬────────┐
│ time    │ user   │ amount │
├─────────┼────────┼────────┤
│ 09:00   │ Kim    │ 1000   │  ← 기존 데이터
│ 09:01   │ Lee    │ 2000   │
│ 09:02   │ Park   │ 1500   │  ← 새 데이터 (이번 트리거)
│ 09:03   │ Kim    │ 3000   │
└─────────┴────────┴────────┘
            ↓ 증분 쿼리 실행
결과 테이블
┌────────┬──────────────┐
│ user   │ total_amount │
├────────┼──────────────┤
│ Kim    │ 4000         │  ← 갱신됨
│ Lee    │ 2000         │
│ Park   │ 1500         │  ← 새로 추가
└────────┴──────────────┘

마이크로배치 vs 연속 처리

Micro-batch 모드

기본 실행 모드입니다. 일정 간격(기본 트리거 간격)으로 새로 도착한 데이터를 모아 소규모 배치로 처리합니다.

micro-batch-flow
text
시간: ──|──100ms──|──100ms──|──100ms──|──
배치:    [B1      ] [B2      ] [B3      ]
이벤트:  ** *  *    *  **      * * *
  • 지연 시간: 트리거 간격에 의존 (보통 100ms ~ 수 초)
  • 처리량: 배치 최적화로 높은 처리량
  • Exactly-once: Write-Ahead Log 기반으로 보장

Continuous Processing 모드

Spark 2.3에서 도입된 실험적 모드입니다. 이벤트를 도착 즉시 처리하여 밀리초 수준의 지연을 달성합니다.

continuous_mode.py
python
query = (df.writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9092")
    .option("topic", "output")
    .trigger(continuous="1 second")  # 연속 처리, 1초 체크포인트 간격
    .start())
  • 지연 시간: ~1ms (마이크로배치 대비 대폭 감소)
  • 제약: 지원하는 연산이 제한적 (집계, 조인 미지원)
  • 보장 수준: At-least-once (Exactly-once 미지원)
Warning

Continuous Processing 모드는 2026년 현재에도 실험적 상태입니다. 프로덕션에서 밀리초 수준의 지연이 필요하다면 Flink를 검토하는 것이 더 안정적입니다. Spark의 마이크로배치 모드는 초 단위 지연이 허용되는 대부분의 사용 사례에서 충분합니다.


DataFrame API로 스트리밍

기본 구조

spark_streaming_basic.py
python
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, window, sum as spark_sum
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
 
spark = SparkSession.builder \
    .appName("OrderAnalytics") \
    .getOrCreate()
 
# 스키마 정의
order_schema = StructType([
    StructField("order_id", StringType()),
    StructField("user_id", StringType()),
    StructField("amount", DoubleType()),
    StructField("category", StringType()),
    StructField("order_time", TimestampType())
])
 
# Kafka 소스에서 읽기
raw_df = (spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9092")
    .option("subscribe", "orders")
    .option("startingOffsets", "latest")
    .load())
 
# JSON 파싱
orders_df = (raw_df
    .select(from_json(col("value").cast("string"), order_schema).alias("data"))
    .select("data.*"))
 
# 처리: 카테고리별 5분 매출 집계
revenue_df = (orders_df
    .withWatermark("order_time", "5 minutes")
    .groupBy(
        window(col("order_time"), "5 minutes"),
        col("category")
    )
    .agg(spark_sum("amount").alias("total_revenue")))
 
# 결과 출력
query = (revenue_df.writeStream
    .outputMode("update")
    .format("console")
    .trigger(processingTime="10 seconds")
    .start())
 
query.awaitTermination()

출력 모드

Structured Streaming은 세 가지 출력 모드를 지원합니다.

모드설명적합한 경우
append새로 확정된 행만 출력윈도우 종료 후 결과, 단순 필터링
update변경된 행만 출력집계 결과의 증분 갱신
complete전체 결과 테이블을 매번 출력전체 결과가 필요한 소규모 집계

윈도우와 워터마크

윈도우 집계

window_aggregation.py
python
from pyspark.sql.functions import window, count, avg
 
# 텀블링 윈도우: 10분 단위 집계
tumbling = (orders_df
    .withWatermark("order_time", "10 minutes")
    .groupBy(
        window(col("order_time"), "10 minutes"),
        col("category")
    )
    .agg(
        count("*").alias("order_count"),
        avg("amount").alias("avg_amount")
    ))
 
# 슬라이딩 윈도우: 10분 윈도우, 5분 슬라이드
sliding = (orders_df
    .withWatermark("order_time", "10 minutes")
    .groupBy(
        window(col("order_time"), "10 minutes", "5 minutes"),
        col("category")
    )
    .agg(count("*").alias("order_count")))

워터마크 설정

Spark의 워터마크는 withWatermark 메서드로 설정합니다. 지정된 지연 시간보다 더 늦게 도착한 이벤트는 무시됩니다.

watermark_config.py
python
# 최대 10분까지 늦은 이벤트 허용
df_with_watermark = orders_df.withWatermark("order_time", "10 minutes")

워터마크가 설정되면 Spark는 다음과 같이 동작합니다.

  1. 관찰된 최대 이벤트 시간을 추적합니다.
  2. 최대 이벤트 시간 - 지연 허용치를 워터마크로 설정합니다.
  3. 워터마크보다 오래된 이벤트 시간을 가진 데이터를 무시합니다.
  4. 워터마크를 넘긴 윈도우의 상태를 정리하여 메모리를 확보합니다.
Info

워터마크 없이 윈도우 집계를 실행하면 상태가 무한히 증가합니다. 프로덕션에서는 반드시 워터마크를 설정하여 오래된 상태를 정리해야 합니다.


foreachBatch 패턴

foreachBatch는 각 마이크로배치의 결과 DataFrame에 임의의 처리를 적용하는 강력한 패턴입니다. 기존 배치 코드를 스트리밍에 재사용하거나, 여러 싱크에 동시에 쓸 때 유용합니다.

foreach_batch.py
python
def process_batch(batch_df, batch_id):
    # 배치 DataFrame에 대해 임의의 처리 가능
 
    # 1. Elasticsearch에 쓰기
    (batch_df.write
        .format("org.elasticsearch.spark.sql")
        .option("es.resource", "orders/_doc")
        .mode("append")
        .save())
 
    # 2. PostgreSQL에 upsert
    (batch_df.write
        .format("jdbc")
        .option("url", "jdbc:postgresql://postgres:5432/analytics")
        .option("dbtable", "order_summary")
        .option("user", "writer")
        .mode("append")
        .save())
 
    # 3. 캐시 갱신
    update_redis_cache(batch_df)
 
query = (revenue_df.writeStream
    .foreachBatch(process_batch)
    .trigger(processingTime="30 seconds")
    .option("checkpointLocation", "/checkpoints/revenue")
    .start())

foreachBatch에서의 멱등성

장애 복구 시 동일 배치가 재실행될 수 있으므로, foreachBatch 내의 처리는 멱등적이어야 합니다.

idempotent_batch.py
python
def idempotent_process(batch_df, batch_id):
    # batch_id를 활용하여 중복 처리 방지
    if is_already_processed(batch_id):
        return
 
    # 처리 수행
    write_to_sink(batch_df)
 
    # 처리 완료 기록
    mark_as_processed(batch_id)

Kafka 소스/싱크 연동

Kafka 소스 설정

kafka_source.py
python
kafka_df = (spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9092")
    .option("subscribe", "orders,payments")       # 여러 토픽 구독
    .option("startingOffsets", "earliest")         # 처음부터 읽기
    .option("maxOffsetsPerTrigger", "10000")       # 트리거당 최대 오프셋
    .option("kafka.security.protocol", "SASL_SSL") # 보안 설정
    .load())
 
# Kafka 레코드 구조
# key: binary, value: binary, topic: string,
# partition: int, offset: long, timestamp: timestamp

Kafka 싱크 설정

kafka_sink.py
python
# 처리 결과를 다른 Kafka 토픽에 쓰기
query = (processed_df
    .selectExpr("CAST(order_id AS STRING) AS key",
                "to_json(struct(*)) AS value")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9092")
    .option("topic", "orders-processed")
    .option("checkpointLocation", "/checkpoints/orders-processed")
    .trigger(processingTime="10 seconds")
    .start())
Tip

maxOffsetsPerTrigger 설정은 각 마이크로배치에서 읽는 최대 데이터량을 제한합니다. 초기 실행 시 대량의 미처리 데이터가 있을 때 OOM(Out of Memory)을 방지하는 데 유용합니다.


Flink vs Spark Structured Streaming 비교

두 엔진의 특성을 비교하여 적합한 활용 시나리오를 판단해 보겠습니다.

실행 모델

항목FlinkSpark Structured Streaming
기본 모델레코드 단위 처리마이크로배치
최소 지연~수 ms~100ms (마이크로배치)
상태 관리내장 상태 백엔드 (RocksDB)Delta Lake / HDFS 체크포인트
Exactly-onceChandy-Lamport 분산 스냅샷Write-Ahead Log
CDC 지원Flink CDC (네이티브)외부 도구 의존 (Debezium 등)

적합한 시나리오

Flink가 유리한 경우:

  • 밀리초 수준의 초저지연이 필요한 경우
  • 복잡한 이벤트 처리(CEP), 패턴 매칭
  • 대규모 상태 관리가 필요한 경우
  • 네이티브 CDC가 필요한 경우
  • 이벤트 시간 기반의 정교한 윈도우 처리

Spark가 유리한 경우:

  • 기존 Spark 배치 파이프라인이 있고 스트리밍을 추가하는 경우
  • 배치와 스트리밍을 동일한 코드로 처리하고 싶은 경우
  • 팀이 Spark/PySpark에 익숙한 경우
  • 머신러닝 파이프라인(MLlib)과 통합이 필요한 경우
  • 초 단위 지연이 허용되는 경우

생태계 비교

Info

두 엔진을 반드시 하나만 선택할 필요는 없습니다. 초저지연 파이프라인에 Flink를, 데이터 레이크 ETL과 ML에 Spark를 병행하는 조직도 많습니다. 기술 선택은 지연 요구사항, 팀 역량, 기존 인프라를 종합적으로 고려해야 합니다.


정리

이번 장에서는 Spark Structured Streaming의 핵심을 학습했습니다.

  • 실행 모델: 스트리밍 데이터를 무한히 추가되는 테이블로 취급하여 배치 API와 통합합니다.
  • 마이크로배치 vs 연속 처리: 대부분의 경우 마이크로배치가 적합하며, 연속 처리는 실험적입니다.
  • DataFrame API: 배치와 동일한 API로 스트리밍 처리를 작성합니다.
  • foreachBatch: 각 마이크로배치에 임의 처리를 적용하는 유연한 패턴입니다.
  • Flink vs Spark: 지연 요구사항, 팀 역량, 기존 인프라에 따라 적합한 엔진이 다릅니다.

다음 장 미리보기

8장에서는 CDC(Change Data Capture)를 다룹니다. 데이터베이스의 변경 사항을 실시간으로 캡처하는 원리, Debezium의 WAL 기반 아키텍처, Flink CDC 3.6의 네이티브 커넥터, 아웃박스 패턴까지 데이터 통합의 핵심 기술을 학습합니다.

이 글이 도움이 되셨나요?

관련 주제 더 보기

#streaming#data-engineering

관련 글

데이터

8장: CDC(Change Data Capture)

CDC의 원리와 WAL 기반 변경 캡처, Debezium 아키텍처, PostgreSQL/MySQL CDC 실습, Flink CDC 3.6, 아웃박스 패턴, 이벤추얼 컨시스턴시까지 데이터 통합의 핵심을 학습합니다.

2026년 2월 24일·18분
데이터

6장: Apache Flink — 스트림 처리 엔진

Flink의 JobManager/TaskManager 아키텍처, DataStream API, 윈도우와 조인, 상태 관리, Chandy-Lamport 체크포인팅, Flink SQL까지 스트림 처리 엔진의 핵심을 학습합니다.

2026년 2월 20일·19분
데이터

9장: 스키마 레지스트리와 데이터 계약

스키마 진화의 필요성, Confluent Schema Registry, Avro/Protobuf/JSON Schema 비교, 호환성 규칙, 데이터 계약 개념까지 스키마 관리 전략을 체계적으로 학습합니다.

2026년 2월 26일·18분
이전 글6장: Apache Flink — 스트림 처리 엔진
다음 글8장: CDC(Change Data Capture)

댓글

목차

약 14분 남음
  • 학습 목표
  • Spark Structured Streaming이란
  • 마이크로배치 vs 연속 처리
    • Micro-batch 모드
    • Continuous Processing 모드
  • DataFrame API로 스트리밍
    • 기본 구조
    • 출력 모드
  • 윈도우와 워터마크
    • 윈도우 집계
    • 워터마크 설정
  • foreachBatch 패턴
    • foreachBatch에서의 멱등성
  • Kafka 소스/싱크 연동
    • Kafka 소스 설정
    • Kafka 싱크 설정
  • Flink vs Spark Structured Streaming 비교
    • 실행 모델
    • 적합한 시나리오
    • 생태계 비교
  • 정리
  • 다음 장 미리보기