재시도, 서킷 브레이커, 관측 가능성, 비용 추적, 스키마 버전 관리 등 프로덕션 수준의 AI 파이프라인 운영 기법을 학습합니다.
LLM API는 일시적 오류(Rate Limit, 타임아웃, 서버 오류)가 빈번합니다. 지수 백오프(Exponential Backoff)로 재시도하면 대부분의 일시적 오류를 극복할 수 있습니다.
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type,
before_sleep_log,
)
from openai import RateLimitError, APITimeoutError, APIConnectionError
import logging
logger = logging.getLogger(__name__)
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=2, max=60),
retry=retry_if_exception_type((
RateLimitError,
APITimeoutError,
APIConnectionError,
)),
before_sleep=before_sleep_log(logger, logging.WARNING),
)
async def call_llm_with_retry(client, messages, response_model):
"""지수 백오프로 LLM을 호출합니다."""
return await client.chat.completions.create(
model="gpt-4o-2026-02",
response_model=response_model,
messages=messages,
)**Circuit Breaker(서킷 브레이커)**는 연속 실패가 임계치를 넘으면 호출을 차단하여, 장애가 전파되는 것을 방지합니다. 일정 시간이 지나면 다시 시도합니다.
import time
from enum import Enum
from dataclasses import dataclass, field
class CircuitState(str, Enum):
CLOSED = "closed" # 정상 운영
OPEN = "open" # 호출 차단
HALF_OPEN = "half_open" # 시험적 허용
@dataclass
class CircuitBreaker:
"""서킷 브레이커 구현"""
failure_threshold: int = 5
recovery_timeout: float = 60.0
half_open_max_calls: int = 3
_state: CircuitState = field(default=CircuitState.CLOSED, init=False)
_failure_count: int = field(default=0, init=False)
_last_failure_time: float = field(default=0.0, init=False)
_half_open_calls: int = field(default=0, init=False)
@property
def state(self) -> CircuitState:
if self._state == CircuitState.OPEN:
elapsed = time.time() - self._last_failure_time
if elapsed >= self.recovery_timeout:
self._state = CircuitState.HALF_OPEN
self._half_open_calls = 0
return self._state
def can_execute(self) -> bool:
"""호출 가능 여부를 반환합니다."""
state = self.state
if state == CircuitState.CLOSED:
return True
elif state == CircuitState.HALF_OPEN:
return self._half_open_calls < self.half_open_max_calls
return False
def record_success(self) -> None:
"""성공을 기록합니다."""
if self._state == CircuitState.HALF_OPEN:
self._half_open_calls += 1
if self._half_open_calls >= self.half_open_max_calls:
self._state = CircuitState.CLOSED
self._failure_count = 0
else:
self._failure_count = 0
def record_failure(self) -> None:
"""실패를 기록합니다."""
self._failure_count += 1
self._last_failure_time = time.time()
if self._failure_count >= self.failure_threshold:
self._state = CircuitState.OPEN
def __str__(self) -> str:
return (
f"CircuitBreaker(state={self.state.value}, "
f"failures={self._failure_count}/"
f"{self.failure_threshold})"
)import asyncio
# 프로바이더별 서킷 브레이커
breakers = {
"openai": CircuitBreaker(failure_threshold=5, recovery_timeout=60),
"anthropic": CircuitBreaker(failure_threshold=5, recovery_timeout=60),
}
async def extract_with_circuit_breaker(
content: str,
response_model,
providers: list[dict]
):
"""서킷 브레이커가 적용된 폴백 추출"""
for provider in providers:
name = provider["name"]
breaker = breakers[name]
if not breaker.can_execute():
continue
try:
result = await provider["extract_fn"](content, response_model)
breaker.record_success()
return result
except Exception:
breaker.record_failure()
raise RuntimeError("모든 프로바이더의 서킷 브레이커가 열려 있습니다.")프로덕션 파이프라인에서는 무엇이 일어나고 있는지 실시간으로 파악해야 합니다.
import structlog
import time
from functools import wraps
logger = structlog.get_logger()
def log_extraction(func):
"""추출 함수에 구조화된 로깅을 추가하는 데코레이터"""
@wraps(func)
async def wrapper(*args, **kwargs):
start = time.time()
doc_id = kwargs.get("doc_id", "unknown")
logger.info(
"extraction_started",
doc_id=doc_id,
model=kwargs.get("model", "unknown"),
)
try:
result = await func(*args, **kwargs)
duration = time.time() - start
logger.info(
"extraction_completed",
doc_id=doc_id,
duration_ms=round(duration * 1000),
success=True,
)
return result
except Exception as e:
duration = time.time() - start
logger.error(
"extraction_failed",
doc_id=doc_id,
duration_ms=round(duration * 1000),
error_type=type(e).__name__,
error_message=str(e),
)
raise
return wrapperPrometheus 호환 메트릭을 수집하여 모니터링합니다.
from dataclasses import dataclass, field
from collections import defaultdict
import time
import threading
@dataclass
class PipelineMetrics:
"""파이프라인 메트릭 수집기"""
_lock: threading.Lock = field(default_factory=threading.Lock, init=False)
_counters: dict[str, int] = field(
default_factory=lambda: defaultdict(int), init=False
)
_histograms: dict[str, list[float]] = field(
default_factory=lambda: defaultdict(list), init=False
)
_gauges: dict[str, float] = field(
default_factory=lambda: defaultdict(float), init=False
)
def increment(self, name: str, value: int = 1) -> None:
with self._lock:
self._counters[name] += value
def observe(self, name: str, value: float) -> None:
with self._lock:
self._histograms[name].append(value)
def set_gauge(self, name: str, value: float) -> None:
with self._lock:
self._gauges[name] = value
def summary(self) -> dict:
with self._lock:
result = {
"counters": dict(self._counters),
"gauges": dict(self._gauges),
}
for name, values in self._histograms.items():
if values:
result[f"histogram_{name}"] = {
"count": len(values),
"mean": sum(values) / len(values),
"min": min(values),
"max": max(values),
"p95": sorted(values)[int(len(values) * 0.95)],
}
return result
# 전역 메트릭 인스턴스
metrics = PipelineMetrics()
# 사용 예시
metrics.increment("extractions_total")
metrics.increment("extractions_success")
metrics.observe("extraction_duration_seconds", 2.35)
metrics.observe("tokens_used", 1500)
metrics.set_gauge("active_extractions", 5)Grafana와 Prometheus를 사용하면 메트릭을 실시간 대시보드로 시각화할 수 있습니다. 추출 성공률, 평균 응답 시간, 일일 비용 추이 등을 한눈에 파악할 수 있어 운영 효율이 크게 높아집니다.
LLM API 비용은 프로덕션에서 가장 중요한 운영 지표 중 하나입니다.
from dataclasses import dataclass
from datetime import datetime
# 2026년 기준 토큰당 가격 (USD)
MODEL_PRICING = {
"gpt-4o-2026-02": {"input": 2.50 / 1_000_000, "output": 10.00 / 1_000_000},
"gpt-4o-mini": {"input": 0.15 / 1_000_000, "output": 0.60 / 1_000_000},
"claude-sonnet-4-20250514": {"input": 3.00 / 1_000_000, "output": 15.00 / 1_000_000},
}
@dataclass
class UsageRecord:
"""API 사용 기록"""
timestamp: datetime
model: str
input_tokens: int
output_tokens: int
cost_usd: float
pipeline: str
doc_id: str
class CostTracker:
"""LLM API 비용 추적기"""
def __init__(self, daily_budget_usd: float = 100.0):
self.daily_budget = daily_budget_usd
self.records: list[UsageRecord] = []
def record_usage(
self,
model: str,
input_tokens: int,
output_tokens: int,
pipeline: str,
doc_id: str
) -> UsageRecord:
"""API 사용량을 기록합니다."""
pricing = MODEL_PRICING.get(model, {"input": 0, "output": 0})
cost = (
input_tokens * pricing["input"]
+ output_tokens * pricing["output"]
)
record = UsageRecord(
timestamp=datetime.now(),
model=model,
input_tokens=input_tokens,
output_tokens=output_tokens,
cost_usd=cost,
pipeline=pipeline,
doc_id=doc_id,
)
self.records.append(record)
return record
def get_daily_cost(self) -> float:
"""오늘의 총 비용을 계산합니다."""
today = datetime.now().date()
return sum(
r.cost_usd for r in self.records
if r.timestamp.date() == today
)
def is_over_budget(self) -> bool:
"""일일 예산을 초과했는지 확인합니다."""
return self.get_daily_cost() >= self.daily_budget
def get_cost_by_model(self) -> dict[str, float]:
"""모델별 비용을 계산합니다."""
costs: dict[str, float] = {}
for r in self.records:
costs[r.model] = costs.get(r.model, 0) + r.cost_usd
return costs
def get_cost_by_pipeline(self) -> dict[str, float]:
"""파이프라인별 비용을 계산합니다."""
costs: dict[str, float] = {}
for r in self.records:
costs[r.pipeline] = costs.get(r.pipeline, 0) + r.cost_usd
return costsclass BudgetExceededError(Exception):
"""예산 초과 예외"""
pass
async def extract_with_budget_guard(
cost_tracker: CostTracker,
extract_fn,
content: str,
doc_id: str
):
"""예산 확인 후 추출을 실행합니다."""
if cost_tracker.is_over_budget():
raise BudgetExceededError(
f"일일 예산(${cost_tracker.daily_budget:.2f}) 초과. "
f"현재 비용: ${cost_tracker.get_daily_cost():.2f}"
)
result = await extract_fn(content)
# 사용량 기록 (응답에서 토큰 수 추출)
cost_tracker.record_usage(
model="gpt-4o-2026-02",
input_tokens=result._raw_response.usage.prompt_tokens,
output_tokens=result._raw_response.usage.completion_tokens,
pipeline="invoice_extraction",
doc_id=doc_id,
)
return resultPydantic 모델의 스키마가 변경되면 기존 데이터와의 호환성 문제가 발생합니다. 스키마에 버전을 부여하여 관리합니다.
from pydantic import BaseModel, Field
from typing import Literal
class InvoiceV1(BaseModel):
"""송장 스키마 v1"""
schema_version: Literal["1.0"] = "1.0"
invoice_number: str
vendor_name: str
total_amount: float
date: str
class InvoiceV2(BaseModel):
"""송장 스키마 v2 - 세금 정보 추가"""
schema_version: Literal["2.0"] = "2.0"
invoice_number: str
vendor_name: str
subtotal: float
tax_amount: float
total_amount: float
date: str
currency: str = "KRW"
def migrate_v1_to_v2(v1: InvoiceV1) -> InvoiceV2:
"""v1 데이터를 v2 스키마로 마이그레이션합니다."""
return InvoiceV2(
invoice_number=v1.invoice_number,
vendor_name=v1.vendor_name,
subtotal=v1.total_amount,
tax_amount=0.0, # v1에는 세금 정보가 없음
total_amount=v1.total_amount,
date=v1.date,
currency="KRW", # 기본값
)프롬프트도 코드와 마찬가지로 버전 관리가 필요합니다.
from dataclasses import dataclass
from datetime import datetime
@dataclass
class PromptVersion:
"""프롬프트 버전"""
version: str
template: str
created_at: datetime
description: str
is_active: bool = False
class PromptRegistry:
"""프롬프트 레지스트리"""
def __init__(self):
self._prompts: dict[str, list[PromptVersion]] = {}
def register(
self,
name: str,
version: str,
template: str,
description: str
) -> None:
"""프롬프트 버전을 등록합니다."""
if name not in self._prompts:
self._prompts[name] = []
pv = PromptVersion(
version=version,
template=template,
created_at=datetime.now(),
description=description,
)
self._prompts[name].append(pv)
def activate(self, name: str, version: str) -> None:
"""특정 버전을 활성화합니다."""
for pv in self._prompts.get(name, []):
pv.is_active = (pv.version == version)
def get_active(self, name: str) -> PromptVersion | None:
"""활성 버전을 반환합니다."""
for pv in self._prompts.get(name, []):
if pv.is_active:
return pv
return None
# 사용 예시
registry = PromptRegistry()
registry.register(
name="invoice_extraction",
version="1.0",
template="다음 텍스트에서 송장 정보를 추출하세요: {content}",
description="초기 버전"
)
registry.register(
name="invoice_extraction",
version="1.1",
template=(
"다음 텍스트는 송장 문서입니다. "
"모든 금액은 숫자로만 표시하고 통화 기호는 제외하세요. "
"날짜는 YYYY-MM-DD 형식으로 표준화하세요.\n\n"
"문서: {content}"
),
description="금액/날짜 형식 명시"
)
registry.activate("invoice_extraction", "1.1")프롬프트 변경은 출력 품질에 직접적인 영향을 미칩니다. 새 프롬프트를 배포하기 전에 반드시 테스트 데이터셋으로 품질을 검증하고, 기존 버전과 비교 평가(A/B 테스트)를 수행하세요.
CI 파이프라인에 스키마 테스트를 포함하여, 스키마 변경이 기존 데이터와 호환되는지 자동 검증합니다.
import pytest
from pydantic import ValidationError
# 테스트 데이터: 실제 LLM 출력 샘플
GOLDEN_SAMPLES = [
{
"invoice_number": "INV-2026-001",
"vendor_name": "테스트 주식회사",
"subtotal": 100000,
"tax_amount": 10000,
"total_amount": 110000,
"date": "2026-01-15",
"currency": "KRW",
},
{
"invoice_number": "INV-2026-002",
"vendor_name": "Sample Corp",
"subtotal": 500.00,
"tax_amount": 50.00,
"total_amount": 550.00,
"date": "2026-03-20",
"currency": "USD",
},
]
class TestSchemaCompatibility:
"""스키마 호환성 테스트"""
def test_golden_samples_pass(self):
"""기존 샘플 데이터가 새 스키마를 통과하는지 확인"""
for sample in GOLDEN_SAMPLES:
result = InvoiceV2(**sample)
assert result.invoice_number == sample["invoice_number"]
def test_required_fields(self):
"""필수 필드 누락 시 오류 발생 확인"""
with pytest.raises(ValidationError):
InvoiceV2(
invoice_number="TEST",
# vendor_name 누락
subtotal=100,
tax_amount=10,
total_amount=110,
date="2026-01-01",
)
def test_amount_validation(self):
"""금액 정합성 검증이 동작하는지 확인"""
with pytest.raises(ValidationError):
InvoiceV2(
invoice_number="TEST",
vendor_name="Test",
subtotal=100,
tax_amount=10,
total_amount=999, # 소계 + 세액과 불일치
date="2026-01-01",
)name: Pipeline Tests
on:
push:
paths:
- "pipeline/**"
- "schemas/**"
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.12"
- name: Install dependencies
run: pip install -r requirements.txt
- name: Run schema tests
run: pytest tests/test_schema.py -v
- name: Run validation tests
run: pytest tests/test_validation.py -v
- name: Schema diff check
run: python scripts/check_schema_diff.py이번 장에서는 프로덕션 AI 데이터 파이프라인의 운영에 필요한 핵심 기법을 학습했습니다.
핵심 내용을 정리하면 다음과 같습니다.
10장에서는 지금까지 학습한 모든 기술을 통합하여 실전 프로젝트를 구축합니다. PDF 송장에서 구조화된 JSON 데이터를 추출하는 엔드투엔드 파이프라인을 FastAPI, Pydantic, 검증 루프, 배치 처리로 완성합니다.
이 글이 도움이 되셨나요?
PDF 송장에서 구조화된 JSON 데이터를 추출하는 엔드투엔드 파이프라인을 FastAPI, Pydantic, 검증 루프, 배치 처리로 구축합니다.
스키마 검증, 의미적 검증, 자동 재시도, 멀티 프로바이더 폴백, 부분 출력 복구 등 프로덕션 수준의 검증 전략을 학습합니다.
전통 ETL과 LLM-enhanced ETL을 비교하고, Transform 단계에 LLM을 적용하여 분류, 요약, 정규화, 감성분석을 수행하는 방법을 학습합니다.