LLM 게이트웨이를 활용한 멀티 프로바이더 라우팅, 모델 폴백, 인증/인가, 캐싱, 관측 가능성 등 프로덕션 API 인프라를 학습합니다.
API 게이트웨이는 클라이언트와 백엔드 서비스 사이에 위치하여 횡단 관심사를 중앙에서 처리합니다. AI 서비스에서는 여기에 "LLM 특화 기능"이 추가됩니다.
LiteLLM은 Python 기반의 LLM 프록시로, 100개 이상의 LLM 프로바이더를 OpenAI 호환 인터페이스로 통합합니다.
model_list:
- model_name: "chat-default"
litellm_params:
model: "anthropic/claude-4"
api_key: "sk-ant-..."
model_info:
max_tokens: 8192
input_cost_per_token: 0.000003
output_cost_per_token: 0.000015
- model_name: "chat-default"
litellm_params:
model: "openai/gpt-4o"
api_key: "sk-..."
model_info:
max_tokens: 4096
input_cost_per_token: 0.000005
output_cost_per_token: 0.000015
- model_name: "chat-fast"
litellm_params:
model: "anthropic/claude-4-haiku"
api_key: "sk-ant-..."
- model_name: "embedding"
litellm_params:
model: "openai/text-embedding-3-large"
api_key: "sk-..."
- model_name: "coding"
litellm_params:
model: "anthropic/claude-4"
api_key: "sk-ant-..."
router_settings:
routing_strategy: "latency-based-routing"
num_retries: 3
retry_after: 5
allowed_fails: 2
cooldown_time: 60
litellm_settings:
drop_params: true
set_verbose: false
cache: true
cache_params:
type: "redis"
host: "redis"
port: 6379import litellm
# OpenAI 호환 인터페이스로 어떤 모델이든 호출
response = await litellm.acompletion(
model="anthropic/claude-4",
messages=[{"role": "user", "content": "Hello"}],
max_tokens=1024,
)
# 프록시 서버를 통한 호출 (어떤 SDK로든)
import openai
client = openai.OpenAI(
api_key="sk-internal-key",
base_url="http://litellm-proxy:4000",
)
response = client.chat.completions.create(
model="chat-default", # 라우팅 규칙에 따라 프로바이더 결정
messages=[{"role": "user", "content": "Hello"}],
)Bifrost는 Go로 작성된 고성능 LLM 게이트웨이로, 1ms 미만의 오버헤드를 자랑합니다. 20개 이상의 프로바이더를 지원하며, 대규모 트래픽 환경에 적합합니다.
server:
port: 8080
read_timeout: 120s
write_timeout: 120s
providers:
anthropic:
api_key: "${ANTHROPIC_API_KEY}"
models:
- claude-4
- claude-4-haiku
openai:
api_key: "${OPENAI_API_KEY}"
models:
- gpt-4o
- gpt-4o-mini
routing:
default_strategy: "round-robin"
rules:
- match:
model: "chat-*"
backends:
- provider: anthropic
model: claude-4
weight: 70
- provider: openai
model: gpt-4o
weight: 30
- match:
model: "fast-*"
backends:
- provider: anthropic
model: claude-4-haiku
fallback:
enabled: true
max_retries: 2
retry_on:
- 429 # Rate limit
- 500 # Server error
- 503 # Service unavailable| 특성 | LiteLLM | Bifrost |
|---|---|---|
| 언어 | Python | Go |
| 오버헤드 | 10-50ms | 1ms 미만 |
| 프로바이더 수 | 100+ | 20+ |
| 배포 방식 | Docker, pip | Docker, 바이너리 |
| 캐싱 | Redis 내장 | 외부 연동 |
| UI 대시보드 | 내장 | 별도 구축 필요 |
| 적합한 규모 | 중소규모, 프로토타입 | 대규모, 고성능 요구 |
초기 단계에서는 LiteLLM의 풍부한 프로바이더 지원과 내장 기능(캐싱, 대시보드, 예산 관리)이 유용합니다. 트래픽이 증가하여 게이트웨이 오버헤드가 문제가 되면 Bifrost로 전환하거나, 핵심 경로에만 Bifrost를 배치하는 하이브리드 구성을 고려하세요.
from enum import Enum
from dataclasses import dataclass
import random
class RoutingStrategy(str, Enum):
round_robin = "round-robin"
weighted = "weighted"
latency_based = "latency-based"
cost_based = "cost-based"
capability_based = "capability-based"
@dataclass
class ProviderBackend:
provider: str
model: str
weight: int = 1
avg_latency_ms: float = 0
cost_per_token: float = 0
capabilities: set[str] = None
healthy: bool = True
class SmartRouter:
"""지능형 멀티 프로바이더 라우터"""
def __init__(self, backends: list[ProviderBackend]):
self.backends = backends
def route(
self,
strategy: RoutingStrategy,
request: dict,
) -> ProviderBackend:
healthy = [b for b in self.backends if b.healthy]
if not healthy:
raise NoHealthyBackendError("사용 가능한 백엔드가 없습니다")
if strategy == RoutingStrategy.weighted:
return self._weighted_route(healthy)
elif strategy == RoutingStrategy.latency_based:
return self._latency_route(healthy)
elif strategy == RoutingStrategy.cost_based:
return self._cost_route(healthy, request)
elif strategy == RoutingStrategy.capability_based:
return self._capability_route(healthy, request)
else:
return self._round_robin(healthy)
def _weighted_route(
self, backends: list[ProviderBackend]
) -> ProviderBackend:
weights = [b.weight for b in backends]
return random.choices(backends, weights=weights, k=1)[0]
def _latency_route(
self, backends: list[ProviderBackend]
) -> ProviderBackend:
return min(backends, key=lambda b: b.avg_latency_ms)
def _cost_route(
self,
backends: list[ProviderBackend],
request: dict,
) -> ProviderBackend:
estimated_tokens = estimate_tokens(request)
return min(
backends,
key=lambda b: b.cost_per_token * estimated_tokens,
)
def _capability_route(
self,
backends: list[ProviderBackend],
request: dict,
) -> ProviderBackend:
required = set()
if has_images(request):
required.add("vision")
if has_tools(request):
required.add("tool_calling")
if request.get("response_format"):
required.add("structured_output")
capable = [
b for b in backends
if required.issubset(b.capabilities or set())
]
if not capable:
raise NoCapableBackendError(
f"필요한 기능을 지원하는 백엔드가 없습니다: {required}"
)
return self._latency_route(capable)class FallbackHandler:
"""프로바이더 장애 시 자동 폴백"""
def __init__(
self,
primary: ProviderBackend,
fallbacks: list[ProviderBackend],
max_retries: int = 2,
):
self.chain = [primary] + fallbacks
self.max_retries = max_retries
async def execute(self, request: dict) -> dict:
last_error = None
for backend in self.chain:
if not backend.healthy:
continue
for attempt in range(self.max_retries + 1):
try:
response = await call_provider(
backend, request
)
return response
except RateLimitError:
# 다음 프로바이더로 즉시 폴백
logger.warning(
f"{backend.provider} 레이트 리밋, "
f"폴백 시도"
)
break
except ProviderError as e:
last_error = e
if attempt < self.max_retries:
await asyncio.sleep(
0.5 * (2 ** attempt)
)
else:
# 백엔드를 비정상으로 표시
backend.healthy = False
schedule_health_check(backend)
raise AllProvidersFailedError(
f"모든 프로바이더 실패: {last_error}"
)from fastapi import Security, HTTPException
from fastapi.security import APIKeyHeader
api_key_header = APIKeyHeader(name="Authorization", auto_error=False)
async def verify_api_key(
authorization: str | None = Security(api_key_header),
) -> APIKeyInfo:
if not authorization:
raise HTTPException(status_code=401, detail="API 키가 필요합니다")
# "Bearer sk-..." 형식에서 토큰 추출
if not authorization.startswith("Bearer "):
raise HTTPException(status_code=401, detail="잘못된 인증 형식입니다")
api_key = authorization[7:]
# API 키 검증 (해시 비교)
key_info = await key_store.verify(api_key)
if not key_info:
raise HTTPException(status_code=401, detail="유효하지 않은 API 키입니다")
if key_info.revoked:
raise HTTPException(status_code=401, detail="폐기된 API 키입니다")
if key_info.expires_at and key_info.expires_at < datetime.now():
raise HTTPException(status_code=401, detail="만료된 API 키입니다")
return key_infoimport jwt
from datetime import datetime, timedelta
class JWTAuth:
def __init__(self, secret: str, algorithm: str = "HS256"):
self.secret = secret
self.algorithm = algorithm
def create_token(
self,
user_id: str,
org_id: str,
scopes: list[str],
expires_in: timedelta = timedelta(hours=1),
) -> str:
payload = {
"sub": user_id,
"org": org_id,
"scopes": scopes,
"iat": datetime.utcnow(),
"exp": datetime.utcnow() + expires_in,
}
return jwt.encode(payload, self.secret, algorithm=self.algorithm)
def verify_token(self, token: str) -> dict:
try:
payload = jwt.decode(
token, self.secret, algorithms=[self.algorithm]
)
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="토큰이 만료되었습니다")
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="유효하지 않은 토큰입니다")
# 스코프 기반 인가
def require_scopes(*required_scopes: str):
async def dependency(token_data: dict = Depends(verify_jwt)):
user_scopes = set(token_data.get("scopes", []))
if not set(required_scopes).issubset(user_scopes):
raise HTTPException(
status_code=403,
detail=f"필요한 권한: {required_scopes}",
)
return token_data
return dependency
# 엔드포인트에 스코프 적용
@app.post("/v1/chat/completions")
async def create_completion(
request: CompletionRequest,
user: dict = Depends(require_scopes("chat:write")),
):
pass
@app.get("/v1/usage")
async def get_usage(
user: dict = Depends(require_scopes("usage:read")),
):
passAI API에서 캐싱은 비용 절감과 응답 속도 향상에 큰 효과가 있습니다. 다만, 비결정적 출력의 특성상 시맨틱 캐싱이 일반적인 정확한 매칭보다 효과적입니다.
import hashlib
import json
class AIResponseCache:
"""AI 응답 캐싱"""
def __init__(self, redis_client, ttl: int = 3600):
self.redis = redis_client
self.ttl = ttl
def _cache_key(self, request: dict) -> str:
"""캐시 키 생성 — 결정적 파라미터만 사용"""
cacheable = {
"model": request["model"],
"messages": request["messages"],
"temperature": request.get("temperature", 1.0),
"max_tokens": request.get("max_tokens"),
"seed": request.get("seed"),
"tools": request.get("tools"),
"response_format": request.get("response_format"),
}
# temperature=0이고 seed가 고정이면 결정적
content = json.dumps(cacheable, sort_keys=True, ensure_ascii=False)
return f"ai_cache:{hashlib.sha256(content.encode()).hexdigest()}"
def is_cacheable(self, request: dict) -> bool:
"""캐싱 가능한 요청인지 판별"""
# temperature=0 또는 seed가 지정된 경우만 캐싱
temp = request.get("temperature", 1.0)
seed = request.get("seed")
stream = request.get("stream", False)
return (temp == 0 or seed is not None) and not stream
async def get(self, request: dict) -> dict | None:
if not self.is_cacheable(request):
return None
key = self._cache_key(request)
cached = await self.redis.get(key)
if cached:
return json.loads(cached)
return None
async def set(self, request: dict, response: dict) -> None:
if not self.is_cacheable(request):
return
key = self._cache_key(request)
await self.redis.setex(
key,
self.ttl,
json.dumps(response, ensure_ascii=False),
)AI 응답 캐싱은 temperature=0이고 seed가 고정된 경우에만 안전합니다. 높은 temperature로 생성된 응답을 캐싱하면 다양성이 사라지고 사용자 경험이 저하됩니다. 스트리밍 응답은 캐싱 대상에서 제외하는 것이 일반적입니다.
프로덕션 AI API의 안정적 운영에는 로깅, 메트릭, 트레이싱의 세 기둥이 필요합니다.
import structlog
logger = structlog.get_logger()
async def log_completion(
request_id: str,
model: str,
user_id: str,
prompt_tokens: int,
completion_tokens: int,
latency_ms: float,
status: str,
error: str | None = None,
):
logger.info(
"completion",
request_id=request_id,
model=model,
user_id=user_id,
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
total_tokens=prompt_tokens + completion_tokens,
latency_ms=round(latency_ms, 1),
status=status,
error=error,
cost_usd=calculate_cost(model, prompt_tokens, completion_tokens),
)from prometheus_client import (
Counter, Histogram, Gauge, Summary,
)
# 요청 카운터
request_total = Counter(
"ai_api_requests_total",
"Total AI API requests",
["model", "status", "provider"],
)
# 토큰 사용량
tokens_total = Counter(
"ai_api_tokens_total",
"Total tokens processed",
["model", "type"], # type: prompt/completion
)
# 지연시간 분포
latency_histogram = Histogram(
"ai_api_latency_seconds",
"Request latency",
["model", "stream"],
buckets=[0.1, 0.25, 0.5, 1, 2.5, 5, 10, 30, 60],
)
# TTFT (Time To First Token)
ttft_histogram = Histogram(
"ai_api_ttft_seconds",
"Time to first token for streaming",
["model"],
buckets=[0.05, 0.1, 0.25, 0.5, 1, 2, 5],
)
# 비용
cost_counter = Counter(
"ai_api_cost_usd_total",
"Total cost in USD",
["model", "provider", "user_tier"],
)
# 캐시 적중률
cache_hits = Counter(
"ai_api_cache_hits_total",
"Cache hit count",
["model"],
)
cache_misses = Counter(
"ai_api_cache_misses_total",
"Cache miss count",
["model"],
)관측 가능성 대시보드에서 추적해야 할 핵심 지표입니다.
| 카테고리 | 지표 | 경고 임계값 |
|---|---|---|
| 가용성 | 요청 성공률 | 99.9% 미만 |
| 성능 | P99 지연시간 | 10초 초과 |
| 성능 | TTFT (스트리밍) | 2초 초과 |
| 비용 | 시간당 비용 | 예산의 120% |
| 프로바이더 | 프로바이더별 에러율 | 5% 초과 |
| 레이트 리밋 | 429 응답 비율 | 10% 초과 |
| 캐시 | 캐시 적중률 | 30% 미만 |
게이트웨이는 모든 요청 경로에 위치하므로, 추가되는 지연시간을 최소화해야 합니다.
| 구성 | 추가 지연시간 | 적합한 상황 |
|---|---|---|
| Bifrost (Go) | 0.5-1ms | 대규모, 고성능 요구 |
| 자체 구현 (Rust/Go) | 1-5ms | 커스텀 로직 필요 |
| LiteLLM (Python) | 10-50ms | 중소규모, 빠른 구축 |
| 호스팅 서비스 | 10-50ms | 관리 부담 최소화 |
LLM 추론 자체가 500ms-5000ms 소요되므로, 게이트웨이의 10-50ms 오버헤드는 전체 응답 시간의 1-2%에 불과합니다. 따라서 초기에는 기능이 풍부한 솔루션을 선택하고, 규모가 커진 후 최적화하는 전략이 합리적입니다.
이 장에서는 프로덕션 AI API의 핵심 인프라인 API 게이트웨이를 다루었습니다. LiteLLM과 Bifrost를 비교하고, 멀티 프로바이더 라우팅, 모델 폴백, 인증/인가, 캐싱, 관측 가능성을 설계했습니다.
게이트웨이는 AI API 운영의 제어 평면(Control Plane)입니다. 프로바이더 장애 대응, 비용 최적화, 보안, 모니터링을 중앙에서 관리함으로써 애플리케이션 레벨의 복잡도를 줄이고, 운영 안정성을 높입니다.
11장에서는 시리즈의 마지막으로 실전 프로젝트를 진행합니다. 지금까지 학습한 모든 개념을 통합하여 REST 공개 API + gRPC 내부 통신 아키텍처를 설계하고, OpenAPI 스펙, FastAPI 구현, 스트리밍 엔드포인트, 인증, 레이트 리미팅, SDK 생성까지 전체 파이프라인을 구축합니다.
이 글이 도움이 되셨나요?
관련 주제 더 보기
REST 공개 API와 gRPC 내부 통신을 결합한 AI 서비스 API를 설계하고, OpenAPI 스펙, FastAPI 구현, 스트리밍, 인증, SDK 생성까지 전체를 구축합니다.
OpenAPI 스펙에서 타입 안전 SDK를 자동 생성하고, API 문서화, 인터랙티브 플레이그라운드로 개발자 경험을 최적화하는 방법을 학습합니다.
토큰 기반 레이트 리미팅, 토큰 버킷과 슬라이딩 윈도우 알고리즘, 사용자별 한도 설정, 비용 캡, Redis 기반 구현을 학습합니다.