본문으로 건너뛰기
Kreath Archive
TechProjectsBooksAbout
TechProjectsBooksAbout

내비게이션

  • Tech
  • Projects
  • Books
  • About
  • Tags

카테고리

  • AI / ML
  • 웹 개발
  • 프로그래밍
  • 개발 도구

연결

  • GitHub
  • Email
  • RSS
© 2026 Kreath Archive. All rights reserved.Built with Next.js + MDX
홈TechProjectsBooksAbout
//
  1. 홈
  2. 테크
  3. 9장: 프로덕션 AI 데이터 파이프라인
2026년 4월 5일·AI / ML·

9장: 프로덕션 AI 데이터 파이프라인

재시도, 서킷 브레이커, 관측 가능성, 비용 추적, 스키마 버전 관리 등 프로덕션 수준의 AI 파이프라인 운영 기법을 학습합니다.

15분1,417자8개 섹션
structured-outputaidata-engineeringllm
공유
structured-output9 / 10
12345678910
이전8장: 출력 검증과 폴백 전략다음10장: 실전 프로젝트 — Structured Output 파이프라인 구축

학습 목표

  • 재시도, 서킷 브레이커 패턴으로 파이프라인 안정성을 확보합니다
  • 로깅, 메트릭, 트레이싱으로 관측 가능성을 구축합니다
  • LLM API 비용을 추적하고 예산을 관리합니다
  • 스키마와 프롬프트의 버전 관리 전략을 수립합니다
  • CI/CD 파이프라인에 검증 단계를 통합합니다

안정성 패턴

지수 백오프 재시도

LLM API는 일시적 오류(Rate Limit, 타임아웃, 서버 오류)가 빈번합니다. 지수 백오프(Exponential Backoff)로 재시도하면 대부분의 일시적 오류를 극복할 수 있습니다.

retry_strategy.py
python
from tenacity import (
    retry,
    stop_after_attempt,
    wait_exponential,
    retry_if_exception_type,
    before_sleep_log,
)
from openai import RateLimitError, APITimeoutError, APIConnectionError
import logging
 
logger = logging.getLogger(__name__)
 
 
@retry(
    stop=stop_after_attempt(5),
    wait=wait_exponential(multiplier=1, min=2, max=60),
    retry=retry_if_exception_type((
        RateLimitError,
        APITimeoutError,
        APIConnectionError,
    )),
    before_sleep=before_sleep_log(logger, logging.WARNING),
)
async def call_llm_with_retry(client, messages, response_model):
    """지수 백오프로 LLM을 호출합니다."""
    return await client.chat.completions.create(
        model="gpt-4o-2026-02",
        response_model=response_model,
        messages=messages,
    )

서킷 브레이커

**Circuit Breaker(서킷 브레이커)**는 연속 실패가 임계치를 넘으면 호출을 차단하여, 장애가 전파되는 것을 방지합니다. 일정 시간이 지나면 다시 시도합니다.

circuit_breaker.py
python
import time
from enum import Enum
from dataclasses import dataclass, field
 
 
class CircuitState(str, Enum):
    CLOSED = "closed"      # 정상 운영
    OPEN = "open"          # 호출 차단
    HALF_OPEN = "half_open" # 시험적 허용
 
 
@dataclass
class CircuitBreaker:
    """서킷 브레이커 구현"""
    failure_threshold: int = 5
    recovery_timeout: float = 60.0
    half_open_max_calls: int = 3
 
    _state: CircuitState = field(default=CircuitState.CLOSED, init=False)
    _failure_count: int = field(default=0, init=False)
    _last_failure_time: float = field(default=0.0, init=False)
    _half_open_calls: int = field(default=0, init=False)
 
    @property
    def state(self) -> CircuitState:
        if self._state == CircuitState.OPEN:
            elapsed = time.time() - self._last_failure_time
            if elapsed >= self.recovery_timeout:
                self._state = CircuitState.HALF_OPEN
                self._half_open_calls = 0
        return self._state
 
    def can_execute(self) -> bool:
        """호출 가능 여부를 반환합니다."""
        state = self.state
        if state == CircuitState.CLOSED:
            return True
        elif state == CircuitState.HALF_OPEN:
            return self._half_open_calls < self.half_open_max_calls
        return False
 
    def record_success(self) -> None:
        """성공을 기록합니다."""
        if self._state == CircuitState.HALF_OPEN:
            self._half_open_calls += 1
            if self._half_open_calls >= self.half_open_max_calls:
                self._state = CircuitState.CLOSED
                self._failure_count = 0
        else:
            self._failure_count = 0
 
    def record_failure(self) -> None:
        """실패를 기록합니다."""
        self._failure_count += 1
        self._last_failure_time = time.time()
        if self._failure_count >= self.failure_threshold:
            self._state = CircuitState.OPEN
 
    def __str__(self) -> str:
        return (
            f"CircuitBreaker(state={self.state.value}, "
            f"failures={self._failure_count}/"
            f"{self.failure_threshold})"
        )

서킷 브레이커 적용

circuit_breaker_usage.py
python
import asyncio
 
# 프로바이더별 서킷 브레이커
breakers = {
    "openai": CircuitBreaker(failure_threshold=5, recovery_timeout=60),
    "anthropic": CircuitBreaker(failure_threshold=5, recovery_timeout=60),
}
 
 
async def extract_with_circuit_breaker(
    content: str,
    response_model,
    providers: list[dict]
):
    """서킷 브레이커가 적용된 폴백 추출"""
    for provider in providers:
        name = provider["name"]
        breaker = breakers[name]
 
        if not breaker.can_execute():
            continue
 
        try:
            result = await provider["extract_fn"](content, response_model)
            breaker.record_success()
            return result
        except Exception:
            breaker.record_failure()
 
    raise RuntimeError("모든 프로바이더의 서킷 브레이커가 열려 있습니다.")

관측 가능성 (Observability)

프로덕션 파이프라인에서는 무엇이 일어나고 있는지 실시간으로 파악해야 합니다.

구조화된 로깅

structured_logging.py
python
import structlog
import time
from functools import wraps
 
logger = structlog.get_logger()
 
 
def log_extraction(func):
    """추출 함수에 구조화된 로깅을 추가하는 데코레이터"""
    @wraps(func)
    async def wrapper(*args, **kwargs):
        start = time.time()
        doc_id = kwargs.get("doc_id", "unknown")
 
        logger.info(
            "extraction_started",
            doc_id=doc_id,
            model=kwargs.get("model", "unknown"),
        )
 
        try:
            result = await func(*args, **kwargs)
            duration = time.time() - start
 
            logger.info(
                "extraction_completed",
                doc_id=doc_id,
                duration_ms=round(duration * 1000),
                success=True,
            )
            return result
 
        except Exception as e:
            duration = time.time() - start
            logger.error(
                "extraction_failed",
                doc_id=doc_id,
                duration_ms=round(duration * 1000),
                error_type=type(e).__name__,
                error_message=str(e),
            )
            raise
 
    return wrapper

메트릭 수집

Prometheus 호환 메트릭을 수집하여 모니터링합니다.

metrics.py
python
from dataclasses import dataclass, field
from collections import defaultdict
import time
import threading
 
 
@dataclass
class PipelineMetrics:
    """파이프라인 메트릭 수집기"""
    _lock: threading.Lock = field(default_factory=threading.Lock, init=False)
    _counters: dict[str, int] = field(
        default_factory=lambda: defaultdict(int), init=False
    )
    _histograms: dict[str, list[float]] = field(
        default_factory=lambda: defaultdict(list), init=False
    )
    _gauges: dict[str, float] = field(
        default_factory=lambda: defaultdict(float), init=False
    )
 
    def increment(self, name: str, value: int = 1) -> None:
        with self._lock:
            self._counters[name] += value
 
    def observe(self, name: str, value: float) -> None:
        with self._lock:
            self._histograms[name].append(value)
 
    def set_gauge(self, name: str, value: float) -> None:
        with self._lock:
            self._gauges[name] = value
 
    def summary(self) -> dict:
        with self._lock:
            result = {
                "counters": dict(self._counters),
                "gauges": dict(self._gauges),
            }
            for name, values in self._histograms.items():
                if values:
                    result[f"histogram_{name}"] = {
                        "count": len(values),
                        "mean": sum(values) / len(values),
                        "min": min(values),
                        "max": max(values),
                        "p95": sorted(values)[int(len(values) * 0.95)],
                    }
            return result
 
 
# 전역 메트릭 인스턴스
metrics = PipelineMetrics()
 
# 사용 예시
metrics.increment("extractions_total")
metrics.increment("extractions_success")
metrics.observe("extraction_duration_seconds", 2.35)
metrics.observe("tokens_used", 1500)
metrics.set_gauge("active_extractions", 5)
Tip

Grafana와 Prometheus를 사용하면 메트릭을 실시간 대시보드로 시각화할 수 있습니다. 추출 성공률, 평균 응답 시간, 일일 비용 추이 등을 한눈에 파악할 수 있어 운영 효율이 크게 높아집니다.


비용 추적

LLM API 비용은 프로덕션에서 가장 중요한 운영 지표 중 하나입니다.

cost_tracker.py
python
from dataclasses import dataclass
from datetime import datetime
 
 
# 2026년 기준 토큰당 가격 (USD)
MODEL_PRICING = {
    "gpt-4o-2026-02": {"input": 2.50 / 1_000_000, "output": 10.00 / 1_000_000},
    "gpt-4o-mini": {"input": 0.15 / 1_000_000, "output": 0.60 / 1_000_000},
    "claude-sonnet-4-20250514": {"input": 3.00 / 1_000_000, "output": 15.00 / 1_000_000},
}
 
 
@dataclass
class UsageRecord:
    """API 사용 기록"""
    timestamp: datetime
    model: str
    input_tokens: int
    output_tokens: int
    cost_usd: float
    pipeline: str
    doc_id: str
 
 
class CostTracker:
    """LLM API 비용 추적기"""
 
    def __init__(self, daily_budget_usd: float = 100.0):
        self.daily_budget = daily_budget_usd
        self.records: list[UsageRecord] = []
 
    def record_usage(
        self,
        model: str,
        input_tokens: int,
        output_tokens: int,
        pipeline: str,
        doc_id: str
    ) -> UsageRecord:
        """API 사용량을 기록합니다."""
        pricing = MODEL_PRICING.get(model, {"input": 0, "output": 0})
        cost = (
            input_tokens * pricing["input"]
            + output_tokens * pricing["output"]
        )
 
        record = UsageRecord(
            timestamp=datetime.now(),
            model=model,
            input_tokens=input_tokens,
            output_tokens=output_tokens,
            cost_usd=cost,
            pipeline=pipeline,
            doc_id=doc_id,
        )
        self.records.append(record)
        return record
 
    def get_daily_cost(self) -> float:
        """오늘의 총 비용을 계산합니다."""
        today = datetime.now().date()
        return sum(
            r.cost_usd for r in self.records
            if r.timestamp.date() == today
        )
 
    def is_over_budget(self) -> bool:
        """일일 예산을 초과했는지 확인합니다."""
        return self.get_daily_cost() >= self.daily_budget
 
    def get_cost_by_model(self) -> dict[str, float]:
        """모델별 비용을 계산합니다."""
        costs: dict[str, float] = {}
        for r in self.records:
            costs[r.model] = costs.get(r.model, 0) + r.cost_usd
        return costs
 
    def get_cost_by_pipeline(self) -> dict[str, float]:
        """파이프라인별 비용을 계산합니다."""
        costs: dict[str, float] = {}
        for r in self.records:
            costs[r.pipeline] = costs.get(r.pipeline, 0) + r.cost_usd
        return costs

예산 초과 방지

budget_guard.py
python
class BudgetExceededError(Exception):
    """예산 초과 예외"""
    pass
 
 
async def extract_with_budget_guard(
    cost_tracker: CostTracker,
    extract_fn,
    content: str,
    doc_id: str
):
    """예산 확인 후 추출을 실행합니다."""
    if cost_tracker.is_over_budget():
        raise BudgetExceededError(
            f"일일 예산(${cost_tracker.daily_budget:.2f}) 초과. "
            f"현재 비용: ${cost_tracker.get_daily_cost():.2f}"
        )
 
    result = await extract_fn(content)
 
    # 사용량 기록 (응답에서 토큰 수 추출)
    cost_tracker.record_usage(
        model="gpt-4o-2026-02",
        input_tokens=result._raw_response.usage.prompt_tokens,
        output_tokens=result._raw_response.usage.completion_tokens,
        pipeline="invoice_extraction",
        doc_id=doc_id,
    )
 
    return result

버전 관리

스키마 버전 관리

Pydantic 모델의 스키마가 변경되면 기존 데이터와의 호환성 문제가 발생합니다. 스키마에 버전을 부여하여 관리합니다.

schema_versioning.py
python
from pydantic import BaseModel, Field
from typing import Literal
 
 
class InvoiceV1(BaseModel):
    """송장 스키마 v1"""
    schema_version: Literal["1.0"] = "1.0"
    invoice_number: str
    vendor_name: str
    total_amount: float
    date: str
 
 
class InvoiceV2(BaseModel):
    """송장 스키마 v2 - 세금 정보 추가"""
    schema_version: Literal["2.0"] = "2.0"
    invoice_number: str
    vendor_name: str
    subtotal: float
    tax_amount: float
    total_amount: float
    date: str
    currency: str = "KRW"
 
 
def migrate_v1_to_v2(v1: InvoiceV1) -> InvoiceV2:
    """v1 데이터를 v2 스키마로 마이그레이션합니다."""
    return InvoiceV2(
        invoice_number=v1.invoice_number,
        vendor_name=v1.vendor_name,
        subtotal=v1.total_amount,
        tax_amount=0.0,  # v1에는 세금 정보가 없음
        total_amount=v1.total_amount,
        date=v1.date,
        currency="KRW",  # 기본값
    )

프롬프트 버전 관리

프롬프트도 코드와 마찬가지로 버전 관리가 필요합니다.

prompt_versioning.py
python
from dataclasses import dataclass
from datetime import datetime
 
 
@dataclass
class PromptVersion:
    """프롬프트 버전"""
    version: str
    template: str
    created_at: datetime
    description: str
    is_active: bool = False
 
 
class PromptRegistry:
    """프롬프트 레지스트리"""
 
    def __init__(self):
        self._prompts: dict[str, list[PromptVersion]] = {}
 
    def register(
        self,
        name: str,
        version: str,
        template: str,
        description: str
    ) -> None:
        """프롬프트 버전을 등록합니다."""
        if name not in self._prompts:
            self._prompts[name] = []
 
        pv = PromptVersion(
            version=version,
            template=template,
            created_at=datetime.now(),
            description=description,
        )
        self._prompts[name].append(pv)
 
    def activate(self, name: str, version: str) -> None:
        """특정 버전을 활성화합니다."""
        for pv in self._prompts.get(name, []):
            pv.is_active = (pv.version == version)
 
    def get_active(self, name: str) -> PromptVersion | None:
        """활성 버전을 반환합니다."""
        for pv in self._prompts.get(name, []):
            if pv.is_active:
                return pv
        return None
 
 
# 사용 예시
registry = PromptRegistry()
registry.register(
    name="invoice_extraction",
    version="1.0",
    template="다음 텍스트에서 송장 정보를 추출하세요: {content}",
    description="초기 버전"
)
registry.register(
    name="invoice_extraction",
    version="1.1",
    template=(
        "다음 텍스트는 송장 문서입니다. "
        "모든 금액은 숫자로만 표시하고 통화 기호는 제외하세요. "
        "날짜는 YYYY-MM-DD 형식으로 표준화하세요.\n\n"
        "문서: {content}"
    ),
    description="금액/날짜 형식 명시"
)
registry.activate("invoice_extraction", "1.1")
Info

프롬프트 변경은 출력 품질에 직접적인 영향을 미칩니다. 새 프롬프트를 배포하기 전에 반드시 테스트 데이터셋으로 품질을 검증하고, 기존 버전과 비교 평가(A/B 테스트)를 수행하세요.


CI/CD 통합

스키마 변경 테스트

CI 파이프라인에 스키마 테스트를 포함하여, 스키마 변경이 기존 데이터와 호환되는지 자동 검증합니다.

test_schema.py
python
import pytest
from pydantic import ValidationError
 
 
# 테스트 데이터: 실제 LLM 출력 샘플
GOLDEN_SAMPLES = [
    {
        "invoice_number": "INV-2026-001",
        "vendor_name": "테스트 주식회사",
        "subtotal": 100000,
        "tax_amount": 10000,
        "total_amount": 110000,
        "date": "2026-01-15",
        "currency": "KRW",
    },
    {
        "invoice_number": "INV-2026-002",
        "vendor_name": "Sample Corp",
        "subtotal": 500.00,
        "tax_amount": 50.00,
        "total_amount": 550.00,
        "date": "2026-03-20",
        "currency": "USD",
    },
]
 
 
class TestSchemaCompatibility:
    """스키마 호환성 테스트"""
 
    def test_golden_samples_pass(self):
        """기존 샘플 데이터가 새 스키마를 통과하는지 확인"""
        for sample in GOLDEN_SAMPLES:
            result = InvoiceV2(**sample)
            assert result.invoice_number == sample["invoice_number"]
 
    def test_required_fields(self):
        """필수 필드 누락 시 오류 발생 확인"""
        with pytest.raises(ValidationError):
            InvoiceV2(
                invoice_number="TEST",
                # vendor_name 누락
                subtotal=100,
                tax_amount=10,
                total_amount=110,
                date="2026-01-01",
            )
 
    def test_amount_validation(self):
        """금액 정합성 검증이 동작하는지 확인"""
        with pytest.raises(ValidationError):
            InvoiceV2(
                invoice_number="TEST",
                vendor_name="Test",
                subtotal=100,
                tax_amount=10,
                total_amount=999,  # 소계 + 세액과 불일치
                date="2026-01-01",
            )

CI 워크플로우

.github/workflows/pipeline-test.yml
yaml
name: Pipeline Tests
 
on:
  push:
    paths:
      - "pipeline/**"
      - "schemas/**"
 
jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
 
      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: "3.12"
 
      - name: Install dependencies
        run: pip install -r requirements.txt
 
      - name: Run schema tests
        run: pytest tests/test_schema.py -v
 
      - name: Run validation tests
        run: pytest tests/test_validation.py -v
 
      - name: Schema diff check
        run: python scripts/check_schema_diff.py

정리

이번 장에서는 프로덕션 AI 데이터 파이프라인의 운영에 필요한 핵심 기법을 학습했습니다.

핵심 내용을 정리하면 다음과 같습니다.

  • 지수 백오프 재시도와 서킷 브레이커로 일시적 장애에 대응하고 장애 전파를 방지합니다
  • 구조화된 로깅, 메트릭 수집, 대시보드로 파이프라인 상태를 실시간 모니터링합니다
  • 비용 추적기로 모델별, 파이프라인별 비용을 관리하고 예산 초과를 방지합니다
  • 스키마와 프롬프트에 버전을 부여하고, 변경 시 호환성 테스트를 수행합니다
  • CI/CD에 스키마 테스트를 통합하여 배포 전 자동 검증을 수행합니다

다음 장 미리보기

10장에서는 지금까지 학습한 모든 기술을 통합하여 실전 프로젝트를 구축합니다. PDF 송장에서 구조화된 JSON 데이터를 추출하는 엔드투엔드 파이프라인을 FastAPI, Pydantic, 검증 루프, 배치 처리로 완성합니다.

이 글이 도움이 되셨나요?

관련 주제 더 보기

#structured-output#ai#data-engineering#llm

관련 글

AI / ML

10장: 실전 프로젝트 — Structured Output 파이프라인 구축

PDF 송장에서 구조화된 JSON 데이터를 추출하는 엔드투엔드 파이프라인을 FastAPI, Pydantic, 검증 루프, 배치 처리로 구축합니다.

2026년 4월 5일·17분
AI / ML

8장: 출력 검증과 폴백 전략

스키마 검증, 의미적 검증, 자동 재시도, 멀티 프로바이더 폴백, 부분 출력 복구 등 프로덕션 수준의 검증 전략을 학습합니다.

2026년 4월 3일·18분
AI / ML

7장: ETL 파이프라인에 LLM 통합

전통 ETL과 LLM-enhanced ETL을 비교하고, Transform 단계에 LLM을 적용하여 분류, 요약, 정규화, 감성분석을 수행하는 방법을 학습합니다.

2026년 4월 1일·14분
이전 글8장: 출력 검증과 폴백 전략
다음 글10장: 실전 프로젝트 — Structured Output 파이프라인 구축

댓글

목차

약 15분 남음
  • 학습 목표
  • 안정성 패턴
    • 지수 백오프 재시도
    • 서킷 브레이커
    • 서킷 브레이커 적용
  • 관측 가능성 (Observability)
    • 구조화된 로깅
    • 메트릭 수집
  • 비용 추적
    • 예산 초과 방지
  • 버전 관리
    • 스키마 버전 관리
    • 프롬프트 버전 관리
  • CI/CD 통합
    • 스키마 변경 테스트
    • CI 워크플로우
  • 정리
  • 다음 장 미리보기