재시도 전략, 서킷 브레이커, OpenTelemetry 통합, 비용 추적, 프로덕션 모니터링까지 프로덕션 안정성 패턴을 다룹니다.
개발 환경에서는 LLM API가 거의 항상 작동합니다. 그러나 프로덕션에서 수천 건의 요청을 처리하면 반드시 실패가 발생합니다. 준비되지 않은 실패는 사용자 경험을 망가뜨리고, 디버깅에 막대한 시간을 소모하게 합니다.
| 유형 | HTTP 코드 | 원인 | 빈도 |
|---|---|---|---|
| 속도 제한 | 429 | 분당 요청/토큰 한도 초과 | 높음 |
| 서버 오류 | 500, 502, 503 | 공급자 측 일시적 장애 | 중간 |
| 타임아웃 | - | 긴 응답 생성, 네트워크 지연 | 중간 |
| 컨텍스트 초과 | 400 | 입력 토큰이 모델 한도 초과 | 낮음 |
| 콘텐츠 필터 | 400 | 안전 필터에 의한 거부 | 낮음 |
| 인증 실패 | 401, 403 | API 키 만료, 권한 부족 | 낮음 |
속도 제한 에러에 가장 효과적인 재시도 전략입니다.
import asyncio
import random
from typing import TypeVar, Callable
T = TypeVar("T")
async def retry_with_exponential_backoff(
func: Callable[..., T],
*args,
max_retries: int = 5,
base_delay: float = 1.0,
max_delay: float = 60.0,
jitter: bool = True,
retryable_errors: tuple = (Exception,),
**kwargs,
) -> T:
"""지수 백오프 + 지터를 적용한 재시도"""
last_error = None
for attempt in range(max_retries):
try:
return await func(*args, **kwargs)
except retryable_errors as e:
last_error = e
if attempt == max_retries - 1:
raise
delay = min(base_delay * (2 ** attempt), max_delay)
if jitter:
delay = delay * (0.5 + random.random())
print(f"재시도 {attempt + 1}/{max_retries}, "
f"{delay:.1f}초 대기 (에러: {e})")
await asyncio.sleep(delay)
raise last_errorfrom langchain_openai import ChatOpenAI
model = ChatOpenAI(model="gpt-4o")
# 재시도 설정
chain_with_retry = (prompt | model | parser).with_retry(
stop_after_attempt=3,
wait_exponential_jitter=True,
retry_if_exception_type=(
ConnectionError,
TimeoutError,
),
)재시도는 멱등한(Idempotent) 작업에만 안전합니다. LLM 호출은 대부분 멱등하지만, 도구 호출(특히 DB 쓰기, 이메일 전송 등)을 포함하는 체인에서는 재시도 시 부수 효과가 중복될 수 있습니다. 이런 경우 멱등성 키를 사용하거나, 재시도 범위를 LLM 호출에만 한정하세요.
재시도가 모두 실패한 경우 대체 경로로 전환하는 전략입니다.
from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic
# 1차: 주 모델
primary = ChatOpenAI(model="gpt-4o", request_timeout=10)
# 2차: 같은 공급자의 저비용 모델
secondary = ChatOpenAI(model="gpt-4o-mini", request_timeout=15)
# 3차: 다른 공급자
tertiary = ChatAnthropic(
model="claude-3-5-haiku-20241022",
timeout=20,
)
# 폴백 체인
robust_chain = (
prompt
| primary.with_fallbacks([secondary, tertiary])
| parser
)모델 폴백 외에도, 처리 전략 자체를 변경하는 폴백이 있습니다.
async def query_with_strategy_fallback(query: str) -> str:
"""전략 수준의 폴백"""
strategies = [
("RAG + GPT-4o", rag_gpt4_chain),
("RAG + GPT-4o-mini", rag_gpt4mini_chain),
("직접 응답 (RAG 없이)", direct_chain),
("사전 정의된 응답", cached_response),
]
for strategy_name, chain in strategies:
try:
result = await chain.ainvoke({"query": query})
print(f"성공 전략: {strategy_name}")
return result
except Exception as e:
print(f"전략 실패 [{strategy_name}]: {e}")
continue
return "죄송합니다, 현재 서비스를 이용할 수 없습니다. 잠시 후 다시 시도해주세요."import asyncio
async def call_with_timeout(chain, input_data, timeout: float = 30.0):
"""타임아웃이 적용된 체인 호출"""
try:
async with asyncio.timeout(timeout):
return await chain.ainvoke(input_data)
except asyncio.TimeoutError:
# 타임아웃 시 폴백
return await fallback_chain.ainvoke(input_data)
# LangChain 모델 수준 타임아웃
model = ChatOpenAI(
model="gpt-4o",
request_timeout=10, # HTTP 요청 타임아웃
max_retries=2, # API 클라이언트 수준 재시도
)연속적인 실패가 발생하면 일시적으로 호출을 중단하여 시스템을 보호하는 패턴입니다.
import time
from enum import Enum
class CircuitState(Enum):
CLOSED = "closed" # 정상 상태
OPEN = "open" # 차단 상태
HALF_OPEN = "half_open" # 시험 상태
class CircuitBreaker:
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: float = 60.0,
half_open_max_calls: int = 3,
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.half_open_max_calls = half_open_max_calls
self.state = CircuitState.CLOSED
self.failure_count = 0
self.last_failure_time = 0
self.half_open_calls = 0
async def call(self, func, *args, **kwargs):
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
self.half_open_calls = 0
else:
raise CircuitOpenError(
f"서킷 브레이커 OPEN 상태. "
f"{self.recovery_timeout}초 후 재시도됩니다."
)
try:
result = await func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _on_success(self):
self.failure_count = 0
if self.state == CircuitState.HALF_OPEN:
self.half_open_calls += 1
if self.half_open_calls >= self.half_open_max_calls:
self.state = CircuitState.CLOSED
def _on_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
# 사용
breaker = CircuitBreaker(failure_threshold=5, recovery_timeout=60)
try:
result = await breaker.call(model.ainvoke, prompt)
except CircuitOpenError:
result = await fallback_model.ainvoke(prompt)서킷 브레이커는 특정 모델이나 공급자에 장애가 발생했을 때 불필요한 재시도를 방지합니다. 예를 들어, OpenAI에 장애가 발생하면 서킷 브레이커가 열리고, 즉시 Anthropic으로 폴백하여 사용자 영향을 최소화합니다.
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanExporter
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
OTLPSpanExporter,
)
from opentelemetry.sdk.resources import Resource
# 리소스 정의
resource = Resource.create({
"service.name": "ai-orchestration-service",
"service.version": "1.0.0",
"deployment.environment": "production",
})
# 트레이서 설정
provider = TracerProvider(resource=resource)
exporter = OTLPSpanExporter(endpoint="http://otel-collector:4317")
provider.add_span_processor(BatchSpanExporter(exporter))
trace.set_tracer_provider(provider)
tracer = trace.get_tracer("ai.orchestration")from opentelemetry import trace
tracer = trace.get_tracer("ai.orchestration")
async def rag_query(query: str) -> str:
with tracer.start_as_current_span("rag_query") as span:
span.set_attribute("query", query)
# 검색 단계
with tracer.start_as_current_span("retrieval") as retrieval_span:
docs = await retriever.aretrieve(query)
retrieval_span.set_attribute("num_docs", len(docs))
retrieval_span.set_attribute(
"avg_score",
sum(d.score for d in docs) / len(docs) if docs else 0,
)
# 생성 단계
with tracer.start_as_current_span("generation") as gen_span:
response = await model.ainvoke(prompt.format(
context=docs, question=query
))
gen_span.set_attribute(
"tokens.input", response.usage_metadata["input_tokens"]
)
gen_span.set_attribute(
"tokens.output", response.usage_metadata["output_tokens"]
)
span.set_attribute("response_length", len(str(response)))
return str(response)from opentelemetry import metrics
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import (
OTLPMetricExporter,
)
# 메트릭 설정
metric_reader = PeriodicExportingMetricReader(
OTLPMetricExporter(endpoint="http://otel-collector:4317"),
export_interval_millis=10000,
)
meter_provider = MeterProvider(metric_readers=[metric_reader])
metrics.set_meter_provider(meter_provider)
meter = metrics.get_meter("ai.orchestration")
# 메트릭 정의
request_counter = meter.create_counter(
"ai.requests.total",
description="총 AI 요청 수",
)
token_counter = meter.create_counter(
"ai.tokens.total",
description="총 토큰 사용량",
)
latency_histogram = meter.create_histogram(
"ai.request.duration",
description="요청 처리 시간 (ms)",
unit="ms",
)
error_counter = meter.create_counter(
"ai.errors.total",
description="에러 발생 수",
)
# 메트릭 기록
async def tracked_invoke(chain, input_data, model_name: str):
start = time.perf_counter()
try:
result = await chain.ainvoke(input_data)
request_counter.add(1, {"model": model_name, "status": "success"})
if hasattr(result, "usage_metadata"):
token_counter.add(
result.usage_metadata["total_tokens"],
{"model": model_name},
)
return result
except Exception as e:
error_counter.add(1, {"model": model_name, "error_type": type(e).__name__})
raise
finally:
duration = (time.perf_counter() - start) * 1000
latency_histogram.record(duration, {"model": model_name})from dataclasses import dataclass, field
from datetime import datetime
# 2026년 기준 대략적 가격 (실제 가격은 공급자 확인)
MODEL_PRICING = {
"gpt-4o": {"input": 2.50, "output": 10.00}, # per 1M tokens
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
"claude-3-5-sonnet": {"input": 3.00, "output": 15.00},
"claude-3-5-haiku": {"input": 0.25, "output": 1.25},
}
@dataclass
class CostTracker:
daily_budget: float = 100.0 # USD
daily_spent: float = 0.0
records: list = field(default_factory=list)
def record_usage(
self,
model: str,
input_tokens: int,
output_tokens: int,
) -> float:
pricing = MODEL_PRICING.get(model, {"input": 0, "output": 0})
cost = (
(input_tokens / 1_000_000) * pricing["input"]
+ (output_tokens / 1_000_000) * pricing["output"]
)
self.daily_spent += cost
self.records.append({
"timestamp": datetime.now().isoformat(),
"model": model,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"cost_usd": cost,
})
return cost
def check_budget(self) -> bool:
"""예산 초과 여부 확인"""
return self.daily_spent < self.daily_budget
def get_summary(self) -> dict:
return {
"daily_budget": self.daily_budget,
"daily_spent": round(self.daily_spent, 4),
"remaining": round(self.daily_budget - self.daily_spent, 4),
"usage_percent": round(
(self.daily_spent / self.daily_budget) * 100, 1
),
"total_requests": len(self.records),
}비용 추적은 단순 로깅을 넘어서 예산 초과 방지 기능과 결합해야 합니다. 일일 예산의 80%에 도달하면 저비용 모델로 자동 전환하고, 100%에 도달하면 서비스를 일시 중단하는 전략이 효과적입니다.
프로덕션 LLM 애플리케이션에서 모니터링해야 할 핵심 지표들입니다.
| 카테고리 | 지표 | 임계값 예시 |
|---|---|---|
| 가용성 | 요청 성공률 | > 99.5% |
| 지연 시간 | P50 / P95 / P99 응답 시간 | P95 < 5초 |
| 비용 | 일일/월간 토큰 비용 | 예산 대비 80% 경고 |
| 품질 | 사용자 피드백 점수 | > 4.0/5.0 |
| 에러 | 에러율, 에러 유형 분포 | 에러율 < 1% |
| 리소스 | 메모리 사용량, CPU 사용률 | 메모리 < 80% |
class AlertManager:
def __init__(self, thresholds: dict):
self.thresholds = thresholds
async def check_and_alert(self, metrics: dict):
alerts = []
if metrics["error_rate"] > self.thresholds["error_rate"]:
alerts.append({
"severity": "critical",
"message": f"에러율 {metrics['error_rate']:.1%}이 "
f"임계값 {self.thresholds['error_rate']:.1%}을 초과했습니다.",
})
if metrics["p95_latency"] > self.thresholds["p95_latency"]:
alerts.append({
"severity": "warning",
"message": f"P95 지연 시간 {metrics['p95_latency']:.1f}ms이 "
f"임계값을 초과했습니다.",
})
if metrics["daily_cost"] > self.thresholds["daily_budget"] * 0.8:
alerts.append({
"severity": "warning",
"message": f"일일 비용이 예산의 "
f"{(metrics['daily_cost']/self.thresholds['daily_budget'])*100:.0f}%에 도달했습니다.",
})
for alert in alerts:
await self.send_alert(alert)마지막 11장에서는 이 시리즈의 모든 내용을 종합합니다. 5대 프레임워크의 비교표, 프로젝트 요구사항에 따른 의사결정 트리, 하이브리드 아키텍처 패턴, 프레임워크 간 마이그레이션 가이드, 그리고 프레임워크 독립적 설계 원칙을 다루겠습니다.
이 글이 도움이 되셨나요?
관련 주제 더 보기
5대 프레임워크 종합 비교, 의사결정 트리, 하이브리드 아키텍처, 마이그레이션 가이드, 프레임워크 독립적 설계 원칙을 다룹니다.
SSE/WebSocket, 토큰/이벤트 스트리밍, 구조화된 출력 스트리밍을 각 프레임워크별로 비교하고 프론트엔드 통합을 다룹니다.
대화 메모리, 장기 메모리, 벡터 메모리, 구조화된 상태를 각 프레임워크별로 비교하고 프로덕션 메모리 전략을 정리합니다.