본문으로 건너뛰기
Kreath Archive
TechProjectsBooksAbout
TechProjectsBooksAbout

내비게이션

  • Tech
  • Projects
  • Books
  • About
  • Tags

카테고리

  • AI / ML
  • 웹 개발
  • 프로그래밍
  • 개발 도구

연결

  • GitHub
  • Email
  • RSS
© 2026 Kreath Archive. All rights reserved.Built with Next.js + MDX
홈TechProjectsBooksAbout
//
  1. 홈
  2. 테크
  3. 10장: 에러 처리, 폴백, 관측 가능성
2026년 2월 20일·AI / ML·

10장: 에러 처리, 폴백, 관측 가능성

재시도 전략, 서킷 브레이커, OpenTelemetry 통합, 비용 추적, 프로덕션 모니터링까지 프로덕션 안정성 패턴을 다룹니다.

15분1,179자11개 섹션
orchestrationai-frameworkaillm
공유
ai-orchestration10 / 11
1234567891011
이전9장: 스트리밍과 실시간 처리 패턴다음11장: 프레임워크 선택 기준과 마이그레이션 전략

이 장에서 배우는 것

  • LLM API의 일반적인 실패 유형과 대응 전략
  • 재시도(Retry), 폴백(Fallback), 타임아웃(Timeout) 구현
  • 서킷 브레이커(Circuit Breaker) 패턴
  • OpenTelemetry를 활용한 트레이싱과 메트릭
  • 토큰 비용 추적과 예산 관리
  • 프로덕션 모니터링 대시보드 구성

프로덕션에서의 실패

개발 환경에서는 LLM API가 거의 항상 작동합니다. 그러나 프로덕션에서 수천 건의 요청을 처리하면 반드시 실패가 발생합니다. 준비되지 않은 실패는 사용자 경험을 망가뜨리고, 디버깅에 막대한 시간을 소모하게 합니다.

일반적인 실패 유형

유형HTTP 코드원인빈도
속도 제한429분당 요청/토큰 한도 초과높음
서버 오류500, 502, 503공급자 측 일시적 장애중간
타임아웃-긴 응답 생성, 네트워크 지연중간
컨텍스트 초과400입력 토큰이 모델 한도 초과낮음
콘텐츠 필터400안전 필터에 의한 거부낮음
인증 실패401, 403API 키 만료, 권한 부족낮음

재시도 전략

지수 백오프 (Exponential Backoff)

속도 제한 에러에 가장 효과적인 재시도 전략입니다.

exponential_backoff.py
python
import asyncio
import random
from typing import TypeVar, Callable
 
T = TypeVar("T")
 
async def retry_with_exponential_backoff(
    func: Callable[..., T],
    *args,
    max_retries: int = 5,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    jitter: bool = True,
    retryable_errors: tuple = (Exception,),
    **kwargs,
) -> T:
    """지수 백오프 + 지터를 적용한 재시도"""
    last_error = None
 
    for attempt in range(max_retries):
        try:
            return await func(*args, **kwargs)
        except retryable_errors as e:
            last_error = e
 
            if attempt == max_retries - 1:
                raise
 
            delay = min(base_delay * (2 ** attempt), max_delay)
            if jitter:
                delay = delay * (0.5 + random.random())
 
            print(f"재시도 {attempt + 1}/{max_retries}, "
                  f"{delay:.1f}초 대기 (에러: {e})")
            await asyncio.sleep(delay)
 
    raise last_error

LangChain의 재시도

langchain_retry.py
python
from langchain_openai import ChatOpenAI
 
model = ChatOpenAI(model="gpt-4o")
 
# 재시도 설정
chain_with_retry = (prompt | model | parser).with_retry(
    stop_after_attempt=3,
    wait_exponential_jitter=True,
    retry_if_exception_type=(
        ConnectionError,
        TimeoutError,
    ),
)
Warning

재시도는 멱등한(Idempotent) 작업에만 안전합니다. LLM 호출은 대부분 멱등하지만, 도구 호출(특히 DB 쓰기, 이메일 전송 등)을 포함하는 체인에서는 재시도 시 부수 효과가 중복될 수 있습니다. 이런 경우 멱등성 키를 사용하거나, 재시도 범위를 LLM 호출에만 한정하세요.


폴백 전략

재시도가 모두 실패한 경우 대체 경로로 전환하는 전략입니다.

다층 폴백

layered_fallback.py
python
from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic
 
# 1차: 주 모델
primary = ChatOpenAI(model="gpt-4o", request_timeout=10)
 
# 2차: 같은 공급자의 저비용 모델
secondary = ChatOpenAI(model="gpt-4o-mini", request_timeout=15)
 
# 3차: 다른 공급자
tertiary = ChatAnthropic(
    model="claude-3-5-haiku-20241022",
    timeout=20,
)
 
# 폴백 체인
robust_chain = (
    prompt
    | primary.with_fallbacks([secondary, tertiary])
    | parser
)

전략 폴백

모델 폴백 외에도, 처리 전략 자체를 변경하는 폴백이 있습니다.

strategy_fallback.py
python
async def query_with_strategy_fallback(query: str) -> str:
    """전략 수준의 폴백"""
    strategies = [
        ("RAG + GPT-4o", rag_gpt4_chain),
        ("RAG + GPT-4o-mini", rag_gpt4mini_chain),
        ("직접 응답 (RAG 없이)", direct_chain),
        ("사전 정의된 응답", cached_response),
    ]
 
    for strategy_name, chain in strategies:
        try:
            result = await chain.ainvoke({"query": query})
            print(f"성공 전략: {strategy_name}")
            return result
        except Exception as e:
            print(f"전략 실패 [{strategy_name}]: {e}")
            continue
 
    return "죄송합니다, 현재 서비스를 이용할 수 없습니다. 잠시 후 다시 시도해주세요."

타임아웃 관리

timeout_management.py
python
import asyncio
 
async def call_with_timeout(chain, input_data, timeout: float = 30.0):
    """타임아웃이 적용된 체인 호출"""
    try:
        async with asyncio.timeout(timeout):
            return await chain.ainvoke(input_data)
    except asyncio.TimeoutError:
        # 타임아웃 시 폴백
        return await fallback_chain.ainvoke(input_data)
 
# LangChain 모델 수준 타임아웃
model = ChatOpenAI(
    model="gpt-4o",
    request_timeout=10,  # HTTP 요청 타임아웃
    max_retries=2,       # API 클라이언트 수준 재시도
)

서킷 브레이커

연속적인 실패가 발생하면 일시적으로 호출을 중단하여 시스템을 보호하는 패턴입니다.

circuit_breaker.py
python
import time
from enum import Enum
 
class CircuitState(Enum):
    CLOSED = "closed"      # 정상 상태
    OPEN = "open"          # 차단 상태
    HALF_OPEN = "half_open"  # 시험 상태
 
class CircuitBreaker:
    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: float = 60.0,
        half_open_max_calls: int = 3,
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.half_open_max_calls = half_open_max_calls
 
        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.last_failure_time = 0
        self.half_open_calls = 0
 
    async def call(self, func, *args, **kwargs):
        if self.state == CircuitState.OPEN:
            if time.time() - self.last_failure_time > self.recovery_timeout:
                self.state = CircuitState.HALF_OPEN
                self.half_open_calls = 0
            else:
                raise CircuitOpenError(
                    f"서킷 브레이커 OPEN 상태. "
                    f"{self.recovery_timeout}초 후 재시도됩니다."
                )
 
        try:
            result = await func(*args, **kwargs)
            self._on_success()
            return result
        except Exception as e:
            self._on_failure()
            raise
 
    def _on_success(self):
        self.failure_count = 0
        if self.state == CircuitState.HALF_OPEN:
            self.half_open_calls += 1
            if self.half_open_calls >= self.half_open_max_calls:
                self.state = CircuitState.CLOSED
 
    def _on_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN
 
# 사용
breaker = CircuitBreaker(failure_threshold=5, recovery_timeout=60)
 
try:
    result = await breaker.call(model.ainvoke, prompt)
except CircuitOpenError:
    result = await fallback_model.ainvoke(prompt)
Info

서킷 브레이커는 특정 모델이나 공급자에 장애가 발생했을 때 불필요한 재시도를 방지합니다. 예를 들어, OpenAI에 장애가 발생하면 서킷 브레이커가 열리고, 즉시 Anthropic으로 폴백하여 사용자 영향을 최소화합니다.


OpenTelemetry 통합

트레이싱 설정

otel_tracing.py
python
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanExporter
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
    OTLPSpanExporter,
)
from opentelemetry.sdk.resources import Resource
 
# 리소스 정의
resource = Resource.create({
    "service.name": "ai-orchestration-service",
    "service.version": "1.0.0",
    "deployment.environment": "production",
})
 
# 트레이서 설정
provider = TracerProvider(resource=resource)
exporter = OTLPSpanExporter(endpoint="http://otel-collector:4317")
provider.add_span_processor(BatchSpanExporter(exporter))
trace.set_tracer_provider(provider)
 
tracer = trace.get_tracer("ai.orchestration")

커스텀 스팬

custom_spans.py
python
from opentelemetry import trace
 
tracer = trace.get_tracer("ai.orchestration")
 
async def rag_query(query: str) -> str:
    with tracer.start_as_current_span("rag_query") as span:
        span.set_attribute("query", query)
 
        # 검색 단계
        with tracer.start_as_current_span("retrieval") as retrieval_span:
            docs = await retriever.aretrieve(query)
            retrieval_span.set_attribute("num_docs", len(docs))
            retrieval_span.set_attribute(
                "avg_score",
                sum(d.score for d in docs) / len(docs) if docs else 0,
            )
 
        # 생성 단계
        with tracer.start_as_current_span("generation") as gen_span:
            response = await model.ainvoke(prompt.format(
                context=docs, question=query
            ))
            gen_span.set_attribute(
                "tokens.input", response.usage_metadata["input_tokens"]
            )
            gen_span.set_attribute(
                "tokens.output", response.usage_metadata["output_tokens"]
            )
 
        span.set_attribute("response_length", len(str(response)))
        return str(response)

메트릭 수집

metrics.py
python
from opentelemetry import metrics
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import (
    OTLPMetricExporter,
)
 
# 메트릭 설정
metric_reader = PeriodicExportingMetricReader(
    OTLPMetricExporter(endpoint="http://otel-collector:4317"),
    export_interval_millis=10000,
)
meter_provider = MeterProvider(metric_readers=[metric_reader])
metrics.set_meter_provider(meter_provider)
 
meter = metrics.get_meter("ai.orchestration")
 
# 메트릭 정의
request_counter = meter.create_counter(
    "ai.requests.total",
    description="총 AI 요청 수",
)
token_counter = meter.create_counter(
    "ai.tokens.total",
    description="총 토큰 사용량",
)
latency_histogram = meter.create_histogram(
    "ai.request.duration",
    description="요청 처리 시간 (ms)",
    unit="ms",
)
error_counter = meter.create_counter(
    "ai.errors.total",
    description="에러 발생 수",
)
 
# 메트릭 기록
async def tracked_invoke(chain, input_data, model_name: str):
    start = time.perf_counter()
    try:
        result = await chain.ainvoke(input_data)
        request_counter.add(1, {"model": model_name, "status": "success"})
        if hasattr(result, "usage_metadata"):
            token_counter.add(
                result.usage_metadata["total_tokens"],
                {"model": model_name},
            )
        return result
    except Exception as e:
        error_counter.add(1, {"model": model_name, "error_type": type(e).__name__})
        raise
    finally:
        duration = (time.perf_counter() - start) * 1000
        latency_histogram.record(duration, {"model": model_name})

비용 추적

cost_tracker.py
python
from dataclasses import dataclass, field
from datetime import datetime
 
# 2026년 기준 대략적 가격 (실제 가격은 공급자 확인)
MODEL_PRICING = {
    "gpt-4o": {"input": 2.50, "output": 10.00},       # per 1M tokens
    "gpt-4o-mini": {"input": 0.15, "output": 0.60},
    "claude-3-5-sonnet": {"input": 3.00, "output": 15.00},
    "claude-3-5-haiku": {"input": 0.25, "output": 1.25},
}
 
@dataclass
class CostTracker:
    daily_budget: float = 100.0  # USD
    daily_spent: float = 0.0
    records: list = field(default_factory=list)
 
    def record_usage(
        self,
        model: str,
        input_tokens: int,
        output_tokens: int,
    ) -> float:
        pricing = MODEL_PRICING.get(model, {"input": 0, "output": 0})
        cost = (
            (input_tokens / 1_000_000) * pricing["input"]
            + (output_tokens / 1_000_000) * pricing["output"]
        )
        self.daily_spent += cost
        self.records.append({
            "timestamp": datetime.now().isoformat(),
            "model": model,
            "input_tokens": input_tokens,
            "output_tokens": output_tokens,
            "cost_usd": cost,
        })
        return cost
 
    def check_budget(self) -> bool:
        """예산 초과 여부 확인"""
        return self.daily_spent < self.daily_budget
 
    def get_summary(self) -> dict:
        return {
            "daily_budget": self.daily_budget,
            "daily_spent": round(self.daily_spent, 4),
            "remaining": round(self.daily_budget - self.daily_spent, 4),
            "usage_percent": round(
                (self.daily_spent / self.daily_budget) * 100, 1
            ),
            "total_requests": len(self.records),
        }
Tip

비용 추적은 단순 로깅을 넘어서 예산 초과 방지 기능과 결합해야 합니다. 일일 예산의 80%에 도달하면 저비용 모델로 자동 전환하고, 100%에 도달하면 서비스를 일시 중단하는 전략이 효과적입니다.


프로덕션 모니터링 체크리스트

프로덕션 LLM 애플리케이션에서 모니터링해야 할 핵심 지표들입니다.

카테고리지표임계값 예시
가용성요청 성공률> 99.5%
지연 시간P50 / P95 / P99 응답 시간P95 < 5초
비용일일/월간 토큰 비용예산 대비 80% 경고
품질사용자 피드백 점수> 4.0/5.0
에러에러율, 에러 유형 분포에러율 < 1%
리소스메모리 사용량, CPU 사용률메모리 < 80%

알림 설정

alerting.py
python
class AlertManager:
    def __init__(self, thresholds: dict):
        self.thresholds = thresholds
 
    async def check_and_alert(self, metrics: dict):
        alerts = []
 
        if metrics["error_rate"] > self.thresholds["error_rate"]:
            alerts.append({
                "severity": "critical",
                "message": f"에러율 {metrics['error_rate']:.1%}이 "
                          f"임계값 {self.thresholds['error_rate']:.1%}을 초과했습니다.",
            })
 
        if metrics["p95_latency"] > self.thresholds["p95_latency"]:
            alerts.append({
                "severity": "warning",
                "message": f"P95 지연 시간 {metrics['p95_latency']:.1f}ms이 "
                          f"임계값을 초과했습니다.",
            })
 
        if metrics["daily_cost"] > self.thresholds["daily_budget"] * 0.8:
            alerts.append({
                "severity": "warning",
                "message": f"일일 비용이 예산의 "
                          f"{(metrics['daily_cost']/self.thresholds['daily_budget'])*100:.0f}%에 도달했습니다.",
            })
 
        for alert in alerts:
            await self.send_alert(alert)

핵심 요약

  • LLM API는 프로덕션에서 반드시 실패하며, 속도 제한(429)이 가장 흔한 에러입니다.
  • 지수 백오프 + 지터를 적용한 재시도가 속도 제한에 가장 효과적입니다.
  • 다층 폴백(같은 공급자 저비용 모델 -> 다른 공급자 -> 전략 변경)으로 가용성을 확보합니다.
  • 서킷 브레이커는 연속 실패 시 불필요한 재시도를 방지하고 시스템을 보호합니다.
  • OpenTelemetry로 트레이싱과 메트릭을 수집하여 성능을 관측합니다.
  • 비용 추적과 예산 관리는 프로덕션 LLM 서비스의 필수 요소입니다.

다음 장 예고

마지막 11장에서는 이 시리즈의 모든 내용을 종합합니다. 5대 프레임워크의 비교표, 프로젝트 요구사항에 따른 의사결정 트리, 하이브리드 아키텍처 패턴, 프레임워크 간 마이그레이션 가이드, 그리고 프레임워크 독립적 설계 원칙을 다루겠습니다.

이 글이 도움이 되셨나요?

관련 주제 더 보기

#orchestration#ai-framework#ai#llm

관련 글

AI / ML

11장: 프레임워크 선택 기준과 마이그레이션 전략

5대 프레임워크 종합 비교, 의사결정 트리, 하이브리드 아키텍처, 마이그레이션 가이드, 프레임워크 독립적 설계 원칙을 다룹니다.

2026년 2월 22일·20분
AI / ML

9장: 스트리밍과 실시간 처리 패턴

SSE/WebSocket, 토큰/이벤트 스트리밍, 구조화된 출력 스트리밍을 각 프레임워크별로 비교하고 프론트엔드 통합을 다룹니다.

2026년 2월 18일·13분
AI / ML

8장: 메모리 관리와 상태 유지

대화 메모리, 장기 메모리, 벡터 메모리, 구조화된 상태를 각 프레임워크별로 비교하고 프로덕션 메모리 전략을 정리합니다.

2026년 2월 16일·16분
이전 글9장: 스트리밍과 실시간 처리 패턴
다음 글11장: 프레임워크 선택 기준과 마이그레이션 전략

댓글

목차

약 15분 남음
  • 이 장에서 배우는 것
  • 프로덕션에서의 실패
    • 일반적인 실패 유형
  • 재시도 전략
    • 지수 백오프 (Exponential Backoff)
    • LangChain의 재시도
  • 폴백 전략
    • 다층 폴백
    • 전략 폴백
  • 타임아웃 관리
  • 서킷 브레이커
  • OpenTelemetry 통합
    • 트레이싱 설정
    • 커스텀 스팬
    • 메트릭 수집
  • 비용 추적
  • 프로덕션 모니터링 체크리스트
    • 알림 설정
  • 핵심 요약
  • 다음 장 예고