Spark Structured Streaming의 마이크로배치와 연속 처리 모드, DataFrame API 기반 스트리밍, 윈도우와 워터마크, Kafka 소스/싱크 연동, Flink와의 비교를 학습합니다.
Spark Structured Streaming은 Apache Spark의 스트림 처리 엔진입니다. Spark의 DataFrame/Dataset API를 스트리밍에 확장한 것으로, 배치 처리와 동일한 API로 스트림 처리를 작성할 수 있습니다.
핵심 아이디어는 **"스트리밍 데이터를 무한히 추가되는 테이블로 취급"**하는 것입니다. 새 데이터가 도착하면 입력 테이블에 행이 추가되고, 쿼리가 증분적으로 실행되어 결과 테이블이 갱신됩니다.
입력 테이블 (끊임없이 행 추가)
┌─────────┬────────┬────────┐
│ time │ user │ amount │
├─────────┼────────┼────────┤
│ 09:00 │ Kim │ 1000 │ ← 기존 데이터
│ 09:01 │ Lee │ 2000 │
│ 09:02 │ Park │ 1500 │ ← 새 데이터 (이번 트리거)
│ 09:03 │ Kim │ 3000 │
└─────────┴────────┴────────┘
↓ 증분 쿼리 실행
결과 테이블
┌────────┬──────────────┐
│ user │ total_amount │
├────────┼──────────────┤
│ Kim │ 4000 │ ← 갱신됨
│ Lee │ 2000 │
│ Park │ 1500 │ ← 새로 추가
└────────┴──────────────┘기본 실행 모드입니다. 일정 간격(기본 트리거 간격)으로 새로 도착한 데이터를 모아 소규모 배치로 처리합니다.
시간: ──|──100ms──|──100ms──|──100ms──|──
배치: [B1 ] [B2 ] [B3 ]
이벤트: ** * * * ** * * *Spark 2.3에서 도입된 실험적 모드입니다. 이벤트를 도착 즉시 처리하여 밀리초 수준의 지연을 달성합니다.
query = (df.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("topic", "output")
.trigger(continuous="1 second") # 연속 처리, 1초 체크포인트 간격
.start())Continuous Processing 모드는 2026년 현재에도 실험적 상태입니다. 프로덕션에서 밀리초 수준의 지연이 필요하다면 Flink를 검토하는 것이 더 안정적입니다. Spark의 마이크로배치 모드는 초 단위 지연이 허용되는 대부분의 사용 사례에서 충분합니다.
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, window, sum as spark_sum
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
spark = SparkSession.builder \
.appName("OrderAnalytics") \
.getOrCreate()
# 스키마 정의
order_schema = StructType([
StructField("order_id", StringType()),
StructField("user_id", StringType()),
StructField("amount", DoubleType()),
StructField("category", StringType()),
StructField("order_time", TimestampType())
])
# Kafka 소스에서 읽기
raw_df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("subscribe", "orders")
.option("startingOffsets", "latest")
.load())
# JSON 파싱
orders_df = (raw_df
.select(from_json(col("value").cast("string"), order_schema).alias("data"))
.select("data.*"))
# 처리: 카테고리별 5분 매출 집계
revenue_df = (orders_df
.withWatermark("order_time", "5 minutes")
.groupBy(
window(col("order_time"), "5 minutes"),
col("category")
)
.agg(spark_sum("amount").alias("total_revenue")))
# 결과 출력
query = (revenue_df.writeStream
.outputMode("update")
.format("console")
.trigger(processingTime="10 seconds")
.start())
query.awaitTermination()Structured Streaming은 세 가지 출력 모드를 지원합니다.
| 모드 | 설명 | 적합한 경우 |
|---|---|---|
append | 새로 확정된 행만 출력 | 윈도우 종료 후 결과, 단순 필터링 |
update | 변경된 행만 출력 | 집계 결과의 증분 갱신 |
complete | 전체 결과 테이블을 매번 출력 | 전체 결과가 필요한 소규모 집계 |
from pyspark.sql.functions import window, count, avg
# 텀블링 윈도우: 10분 단위 집계
tumbling = (orders_df
.withWatermark("order_time", "10 minutes")
.groupBy(
window(col("order_time"), "10 minutes"),
col("category")
)
.agg(
count("*").alias("order_count"),
avg("amount").alias("avg_amount")
))
# 슬라이딩 윈도우: 10분 윈도우, 5분 슬라이드
sliding = (orders_df
.withWatermark("order_time", "10 minutes")
.groupBy(
window(col("order_time"), "10 minutes", "5 minutes"),
col("category")
)
.agg(count("*").alias("order_count")))Spark의 워터마크는 withWatermark 메서드로 설정합니다. 지정된 지연 시간보다 더 늦게 도착한 이벤트는 무시됩니다.
# 최대 10분까지 늦은 이벤트 허용
df_with_watermark = orders_df.withWatermark("order_time", "10 minutes")워터마크가 설정되면 Spark는 다음과 같이 동작합니다.
최대 이벤트 시간 - 지연 허용치를 워터마크로 설정합니다.워터마크 없이 윈도우 집계를 실행하면 상태가 무한히 증가합니다. 프로덕션에서는 반드시 워터마크를 설정하여 오래된 상태를 정리해야 합니다.
foreachBatch는 각 마이크로배치의 결과 DataFrame에 임의의 처리를 적용하는 강력한 패턴입니다. 기존 배치 코드를 스트리밍에 재사용하거나, 여러 싱크에 동시에 쓸 때 유용합니다.
def process_batch(batch_df, batch_id):
# 배치 DataFrame에 대해 임의의 처리 가능
# 1. Elasticsearch에 쓰기
(batch_df.write
.format("org.elasticsearch.spark.sql")
.option("es.resource", "orders/_doc")
.mode("append")
.save())
# 2. PostgreSQL에 upsert
(batch_df.write
.format("jdbc")
.option("url", "jdbc:postgresql://postgres:5432/analytics")
.option("dbtable", "order_summary")
.option("user", "writer")
.mode("append")
.save())
# 3. 캐시 갱신
update_redis_cache(batch_df)
query = (revenue_df.writeStream
.foreachBatch(process_batch)
.trigger(processingTime="30 seconds")
.option("checkpointLocation", "/checkpoints/revenue")
.start())장애 복구 시 동일 배치가 재실행될 수 있으므로, foreachBatch 내의 처리는 멱등적이어야 합니다.
def idempotent_process(batch_df, batch_id):
# batch_id를 활용하여 중복 처리 방지
if is_already_processed(batch_id):
return
# 처리 수행
write_to_sink(batch_df)
# 처리 완료 기록
mark_as_processed(batch_id)kafka_df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("subscribe", "orders,payments") # 여러 토픽 구독
.option("startingOffsets", "earliest") # 처음부터 읽기
.option("maxOffsetsPerTrigger", "10000") # 트리거당 최대 오프셋
.option("kafka.security.protocol", "SASL_SSL") # 보안 설정
.load())
# Kafka 레코드 구조
# key: binary, value: binary, topic: string,
# partition: int, offset: long, timestamp: timestamp# 처리 결과를 다른 Kafka 토픽에 쓰기
query = (processed_df
.selectExpr("CAST(order_id AS STRING) AS key",
"to_json(struct(*)) AS value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("topic", "orders-processed")
.option("checkpointLocation", "/checkpoints/orders-processed")
.trigger(processingTime="10 seconds")
.start())maxOffsetsPerTrigger 설정은 각 마이크로배치에서 읽는 최대 데이터량을 제한합니다. 초기 실행 시 대량의 미처리 데이터가 있을 때 OOM(Out of Memory)을 방지하는 데 유용합니다.
두 엔진의 특성을 비교하여 적합한 활용 시나리오를 판단해 보겠습니다.
| 항목 | Flink | Spark Structured Streaming |
|---|---|---|
| 기본 모델 | 레코드 단위 처리 | 마이크로배치 |
| 최소 지연 | ~수 ms | ~100ms (마이크로배치) |
| 상태 관리 | 내장 상태 백엔드 (RocksDB) | Delta Lake / HDFS 체크포인트 |
| Exactly-once | Chandy-Lamport 분산 스냅샷 | Write-Ahead Log |
| CDC 지원 | Flink CDC (네이티브) | 외부 도구 의존 (Debezium 등) |
Flink가 유리한 경우:
Spark가 유리한 경우:
두 엔진을 반드시 하나만 선택할 필요는 없습니다. 초저지연 파이프라인에 Flink를, 데이터 레이크 ETL과 ML에 Spark를 병행하는 조직도 많습니다. 기술 선택은 지연 요구사항, 팀 역량, 기존 인프라를 종합적으로 고려해야 합니다.
이번 장에서는 Spark Structured Streaming의 핵심을 학습했습니다.
8장에서는 CDC(Change Data Capture)를 다룹니다. 데이터베이스의 변경 사항을 실시간으로 캡처하는 원리, Debezium의 WAL 기반 아키텍처, Flink CDC 3.6의 네이티브 커넥터, 아웃박스 패턴까지 데이터 통합의 핵심 기술을 학습합니다.
이 글이 도움이 되셨나요?
관련 주제 더 보기
CDC의 원리와 WAL 기반 변경 캡처, Debezium 아키텍처, PostgreSQL/MySQL CDC 실습, Flink CDC 3.6, 아웃박스 패턴, 이벤추얼 컨시스턴시까지 데이터 통합의 핵심을 학습합니다.
Flink의 JobManager/TaskManager 아키텍처, DataStream API, 윈도우와 조인, 상태 관리, Chandy-Lamport 체크포인팅, Flink SQL까지 스트림 처리 엔진의 핵심을 학습합니다.
스키마 진화의 필요성, Confluent Schema Registry, Avro/Protobuf/JSON Schema 비교, 호환성 규칙, 데이터 계약 개념까지 스키마 관리 전략을 체계적으로 학습합니다.