토큰 사용량, 지연시간, 비용 추적, 드리프트 감지, 품질 모니터링, 알림 설계, 피드백 루프 등 AI 시스템의 관측 가능성 파이프라인을 다룹니다.
배포가 끝이 아닙니다. 오히려 배포 이후가 시작입니다. AI 시스템은 외부 환경 변화, 사용자 행동 변화, 모델 공급자의 업데이트 등에 의해 성능이 변할 수 있습니다. 어제까지 잘 작동하던 시스템이 오늘 갑자기 이상한 응답을 내놓을 수 있고, 비용이 예상의 3배로 치솟을 수도 있습니다. 모니터링 하네스는 이런 변화를 실시간으로 감지하고 대응하는 체계입니다.
전통적인 인프라 모니터링(CPU, 메모리, 디스크)에 더해, AI 시스템은 고유한 모니터링 축이 필요합니다.
| 축 | 측정 대상 | 대응 |
|---|---|---|
| 운영 | 지연시간, 에러율, 처리량 | 스케일링, 재시도, 폴백 |
| 비용 | 토큰 사용량, API 비용 | 예산 알림, 모델 변경, 캐싱 |
| 품질 | 응답 정확도, 유해성, 사용자 만족도 | 프롬프트 수정, 가드레일 조정 |
| 드리프트 | 입력/출력 분포 변화 | 재평가, 데이터 업데이트 |
AI 시스템의 운영 비용은 대부분 API 호출 비용입니다. 토큰 단위로 과금되므로, 정밀한 추적이 필수입니다.
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from collections import defaultdict
@dataclass
class TokenUsage:
input_tokens: int
output_tokens: int
model: str
timestamp: datetime
request_id: str
user_id: str | None = None
feature: str | None = None # 어떤 기능에서 사용했는지
# 모델별 토큰 단가 (USD per 1M tokens)
PRICING = {
"claude-opus-4-20250514": {"input": 15.0, "output": 75.0},
"claude-sonnet-4-20250514": {"input": 3.0, "output": 15.0},
"claude-haiku-3-20250414": {"input": 0.25, "output": 1.25},
"gpt-4o": {"input": 2.5, "output": 10.0},
"gpt-4o-mini": {"input": 0.15, "output": 0.6},
}
class CostTracker:
"""토큰 사용량 및 비용 추적"""
def __init__(self):
self.usages: list[TokenUsage] = []
self.budget_alerts: list[dict] = []
def record(self, usage: TokenUsage):
self.usages.append(usage)
self._check_budget(usage)
def calculate_cost(self, usage: TokenUsage) -> float:
"""단일 요청의 비용 계산"""
pricing = PRICING.get(usage.model, {"input": 0, "output": 0})
input_cost = usage.input_tokens * pricing["input"] / 1_000_000
output_cost = usage.output_tokens * pricing["output"] / 1_000_000
return input_cost + output_cost
def daily_report(self, date: datetime | None = None) -> dict:
"""일일 비용 리포트"""
target_date = (date or datetime.now()).date()
daily = [
u for u in self.usages
if u.timestamp.date() == target_date
]
by_model = defaultdict(lambda: {"tokens": 0, "cost": 0.0, "requests": 0})
by_feature = defaultdict(lambda: {"tokens": 0, "cost": 0.0})
for usage in daily:
cost = self.calculate_cost(usage)
total_tokens = usage.input_tokens + usage.output_tokens
by_model[usage.model]["tokens"] += total_tokens
by_model[usage.model]["cost"] += cost
by_model[usage.model]["requests"] += 1
if usage.feature:
by_feature[usage.feature]["tokens"] += total_tokens
by_feature[usage.feature]["cost"] += cost
return {
"date": str(target_date),
"total_requests": len(daily),
"total_cost": sum(self.calculate_cost(u) for u in daily),
"by_model": dict(by_model),
"by_feature": dict(by_feature),
}
def _check_budget(self, usage: TokenUsage):
"""예산 초과 확인"""
today_cost = sum(
self.calculate_cost(u) for u in self.usages
if u.timestamp.date() == datetime.now().date()
)
# 일일 예산 50달러 기준
daily_budget = 50.0
if today_cost > daily_budget * 0.8:
self.budget_alerts.append({
"type": "budget_warning",
"message": f"일일 예산의 80% 도달: ${today_cost:.2f}/${daily_budget}",
"timestamp": datetime.now(),
})토큰 비용은 예상보다 빠르게 증가할 수 있습니다. 특히 에이전트 시스템에서 도구 호출이 반복되면 컨텍스트 윈도우가 누적되어 입력 토큰이 급증합니다. 일일/주간/월간 예산 알림을 반드시 설정하세요.
AI 시스템의 지연시간은 모델 추론 시간, 네트워크 지연, 하네스 처리 시간의 합입니다. 각 구간별로 분리하여 추적해야 병목을 파악할 수 있습니다.
import time
from contextlib import contextmanager
from dataclasses import dataclass, field
@dataclass
class LatencyBreakdown:
"""지연시간 구간별 분해"""
total_ms: float = 0
preprocessing_ms: float = 0
model_inference_ms: float = 0
postprocessing_ms: float = 0
tool_execution_ms: float = 0
guardrail_ms: float = 0
class LatencyTracer:
"""구간별 지연시간 추적"""
def __init__(self):
self.breakdown = LatencyBreakdown()
self._spans: dict[str, float] = {}
@contextmanager
def span(self, name: str):
"""구간 측정"""
start = time.monotonic()
try:
yield
finally:
elapsed = (time.monotonic() - start) * 1000
setattr(self.breakdown, f"{name}_ms", elapsed)
def report(self) -> dict:
self.breakdown.total_ms = (
self.breakdown.preprocessing_ms
+ self.breakdown.model_inference_ms
+ self.breakdown.postprocessing_ms
+ self.breakdown.tool_execution_ms
+ self.breakdown.guardrail_ms
)
return {
"total_ms": self.breakdown.total_ms,
"breakdown": {
"preprocessing": self.breakdown.preprocessing_ms,
"model_inference": self.breakdown.model_inference_ms,
"postprocessing": self.breakdown.postprocessing_ms,
"tool_execution": self.breakdown.tool_execution_ms,
"guardrail": self.breakdown.guardrail_ms,
},
"bottleneck": max(
[
("preprocessing", self.breakdown.preprocessing_ms),
("model_inference", self.breakdown.model_inference_ms),
("postprocessing", self.breakdown.postprocessing_ms),
("tool_execution", self.breakdown.tool_execution_ms),
("guardrail", self.breakdown.guardrail_ms),
],
key=lambda x: x[1],
)[0],
}
# 사용 예시
async def process_request(request: str) -> str:
tracer = LatencyTracer()
with tracer.span("preprocessing"):
processed = preprocess(request)
with tracer.span("guardrail"):
validated = check_guardrails(processed)
with tracer.span("model_inference"):
response = await call_model(validated)
with tracer.span("postprocessing"):
result = postprocess(response)
latency_report = tracer.report()
await send_metrics(latency_report)
return result평균 지연시간만으로는 사용자 경험을 정확히 파악할 수 없습니다. P50(중앙값), P95, P99 백분위수를 함께 모니터링해야 합니다.
import bisect
class PercentileMonitor:
"""백분위수 기반 지연시간 모니터링"""
def __init__(self, window_size: int = 1000):
self.values: list[float] = []
self.window_size = window_size
def record(self, value: float):
bisect.insort(self.values, value)
if len(self.values) > self.window_size:
self.values.pop(0)
def percentile(self, p: float) -> float:
if not self.values:
return 0
idx = int(len(self.values) * p / 100)
return self.values[min(idx, len(self.values) - 1)]
def summary(self) -> dict:
return {
"p50": self.percentile(50),
"p95": self.percentile(95),
"p99": self.percentile(99),
"min": self.values[0] if self.values else 0,
"max": self.values[-1] if self.values else 0,
"count": len(self.values),
}드리프트(Drift)는 시간이 지남에 따라 데이터나 모델의 특성이 변하는 현상입니다. AI 시스템에서는 두 가지 유형의 드리프트가 중요합니다.
from collections import Counter
import math
class DriftDetector:
"""입출력 드리프트 감지"""
def __init__(self, baseline_window: int = 1000):
self.baseline_window = baseline_window
self.baseline_distributions: dict[str, Counter] = {}
self.current_distributions: dict[str, Counter] = {}
def set_baseline(self, metric_name: str, values: list[str]):
"""기준 분포 설정"""
self.baseline_distributions[metric_name] = Counter(values)
def update_current(self, metric_name: str, value: str):
"""현재 분포 업데이트"""
if metric_name not in self.current_distributions:
self.current_distributions[metric_name] = Counter()
self.current_distributions[metric_name][value] += 1
def detect_drift(
self, metric_name: str, threshold: float = 0.1
) -> dict:
"""KL 다이버전스 기반 드리프트 감지"""
baseline = self.baseline_distributions.get(metric_name)
current = self.current_distributions.get(metric_name)
if not baseline or not current:
return {"drift_detected": False, "reason": "데이터 부족"}
kl_div = self._kl_divergence(baseline, current)
return {
"drift_detected": kl_div > threshold,
"kl_divergence": kl_div,
"threshold": threshold,
"baseline_size": sum(baseline.values()),
"current_size": sum(current.values()),
}
@staticmethod
def _kl_divergence(p: Counter, q: Counter) -> float:
"""KL 다이버전스 계산"""
all_keys = set(p.keys()) | set(q.keys())
p_total = sum(p.values())
q_total = sum(q.values())
kl = 0.0
for key in all_keys:
p_prob = (p.get(key, 0) + 1) / (p_total + len(all_keys))
q_prob = (q.get(key, 0) + 1) / (q_total + len(all_keys))
if p_prob > 0:
kl += p_prob * math.log(p_prob / q_prob)
return kl
# 사용 예시
detector = DriftDetector()
# 기준 분포 설정 (지난 주의 질문 카테고리)
detector.set_baseline("question_category", [
"product", "product", "billing", "support",
"product", "billing", "product", "support",
# ... 1000개의 기준 데이터
])
# 실시간 업데이트
detector.update_current("question_category", "product")
detector.update_current("question_category", "complaint") # 새로운 카테고리
# 드리프트 확인
drift_result = detector.detect_drift("question_category")
if drift_result["drift_detected"]:
alert("입력 드리프트 감지: 질문 패턴이 변경되었습니다")드리프트는 반드시 나쁜 것만은 아닙니다. 제품 출시 후 사용자 질문 패턴이 바뀌는 것은 자연스러운 현상입니다. 중요한 것은 드리프트를 감지하고, 그에 맞게 시스템을 적응시키는 것입니다.
운영 메트릭이 정상이어도 응답 품질이 저하될 수 있습니다. 품질 모니터링은 AI 시스템의 응답이 기대 수준을 유지하고 있는지를 지속적으로 확인합니다.
from dataclasses import dataclass
@dataclass
class QualityMetrics:
"""품질 메트릭"""
relevance_score: float # 질문과 답변의 관련성
completeness_score: float # 답변의 완전성
safety_score: float # 안전성 점수
user_rating: float | None # 사용자 평점 (선택적)
class QualityMonitor:
"""실시간 품질 모니터링"""
def __init__(self, min_quality_threshold: float = 0.7):
self.threshold = min_quality_threshold
self.scores: list[QualityMetrics] = []
self.alert_count = 0
async def evaluate(
self,
question: str,
answer: str,
evaluator_fn,
) -> QualityMetrics:
"""응답 품질 평가"""
metrics = await evaluator_fn(question, answer)
self.scores.append(metrics)
# 품질 저하 감지
avg_score = (
metrics.relevance_score
+ metrics.completeness_score
+ metrics.safety_score
) / 3
if avg_score < self.threshold:
self.alert_count += 1
await self._handle_quality_alert(
question, answer, metrics
)
return metrics
async def _handle_quality_alert(
self,
question: str,
answer: str,
metrics: QualityMetrics,
):
"""품질 저하 시 대응"""
# 연속 알림이 임계값을 초과하면 에스컬레이션
if self.alert_count >= 5:
await escalate_to_team(
"연속 품질 저하 감지",
f"최근 5건의 응답이 품질 기준 미달. "
f"마지막 점수: {metrics}",
)알림은 너무 적으면 문제를 놓치고, 너무 많으면 알림 피로(Alert Fatigue)를 유발합니다. 효과적인 알림 설계는 심각도와 긴급도에 따라 차등화됩니다.
from enum import Enum
class AlertSeverity(Enum):
CRITICAL = "critical" # 즉시 대응 필요
WARNING = "warning" # 주의 관찰 필요
INFO = "info" # 참고 사항
class AlertChannel(Enum):
PAGER = "pager" # PagerDuty 등
SLACK = "slack"
EMAIL = "email"
DASHBOARD = "dashboard"
# 알림 라우팅 규칙
ALERT_ROUTING = {
AlertSeverity.CRITICAL: [
AlertChannel.PAGER,
AlertChannel.SLACK,
],
AlertSeverity.WARNING: [
AlertChannel.SLACK,
],
AlertSeverity.INFO: [
AlertChannel.DASHBOARD,
],
}
# AI 시스템 특화 알림 규칙
AI_ALERT_RULES = [
{
"name": "에러율 급증",
"condition": "error_rate > 5%",
"severity": AlertSeverity.CRITICAL,
"cooldown_minutes": 15,
},
{
"name": "비용 예산 초과",
"condition": "daily_cost > budget * 0.9",
"severity": AlertSeverity.WARNING,
"cooldown_minutes": 60,
},
{
"name": "지연시간 P99 증가",
"condition": "p99_latency > 10000ms",
"severity": AlertSeverity.WARNING,
"cooldown_minutes": 30,
},
{
"name": "품질 점수 하락",
"condition": "avg_quality < 0.7 for 1h",
"severity": AlertSeverity.CRITICAL,
"cooldown_minutes": 30,
},
{
"name": "드리프트 감지",
"condition": "kl_divergence > 0.15",
"severity": AlertSeverity.INFO,
"cooldown_minutes": 360,
},
]모니터링의 궁극적인 목적은 시스템 개선입니다. 모니터링 데이터를 수집하고, 분석하고, 개선 행동으로 연결하는 피드백 루프(Feedback Loop)가 필요합니다.
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
class FeedbackType(Enum):
THUMBS_UP = "thumbs_up"
THUMBS_DOWN = "thumbs_down"
REPORT = "report"
CORRECTION = "correction"
@dataclass
class UserFeedback:
request_id: str
feedback_type: FeedbackType
comment: str | None = None
correction: str | None = None
timestamp: datetime = field(default_factory=datetime.now)
class FeedbackCollector:
"""사용자 피드백 수집 및 분석"""
def __init__(self):
self.feedbacks: list[UserFeedback] = []
def record(self, feedback: UserFeedback):
self.feedbacks.append(feedback)
def satisfaction_rate(self, days: int = 7) -> float:
"""최근 N일간 만족도"""
cutoff = datetime.now() - timedelta(days=days)
recent = [
f for f in self.feedbacks
if f.timestamp > cutoff
]
if not recent:
return 0.0
positive = sum(
1 for f in recent
if f.feedback_type == FeedbackType.THUMBS_UP
)
return positive / len(recent)
def common_complaints(self, top_n: int = 10) -> list[dict]:
"""빈번한 불만 패턴 분석"""
negative = [
f for f in self.feedbacks
if f.feedback_type in (
FeedbackType.THUMBS_DOWN,
FeedbackType.REPORT,
)
and f.comment
]
# 실제로는 텍스트 클러스터링으로 패턴 추출
return [
{"comment": f.comment, "request_id": f.request_id}
for f in negative[:top_n]
]사용자 피드백에서 "thumbs down"의 이유를 파악하는 것이 중요합니다. 단순 불만족이 아니라, 구체적인 수정 사항(correction)을 수집하면 프롬프트 개선의 직접적인 근거가 됩니다. 이 수정 사항들은 4장에서 다룬 골든 데이터셋에 추가하여 회귀를 방지할 수도 있습니다.
지금까지 다룬 모든 모니터링 요소를 하나의 파이프라인으로 통합하면 다음과 같습니다.
10장에서는 이 시리즈 전체를 관통하는 프로덕션 하네스 통합 전략을 다룹니다. 테스트, 평가, 가드레일, 오케스트레이션, 배포, 모니터링 하네스를 하나의 일관된 시스템으로 통합하는 방법, 하네스 성숙도 모델, CI/CD 통합, 그리고 CLAUDE.md와 AGENTS.md를 활용한 팀 협업 전략까지 살펴봅니다.
이 글이 도움이 되셨나요?
관련 주제 더 보기
전체 하네스 계층 통합, 하네스 성숙도 모델, CI/CD 파이프라인 통합, CLAUDE.md와 AGENTS.md 설계, 팀 협업 전략까지 하네스 엔지니어링의 완결편입니다.
카나리 배포, 섀도우 테스팅, A/B 테스트, 블루-그린 배포, 롤백 전략 등 AI 시스템을 프로덕션에 안전하게 배포하는 전략을 다룹니다.
에이전트 라이프사이클 관리, 도구 오케스트레이션, 서브에이전트 관리, 상태 관리, 에러 복구 등 복잡한 AI 워크플로우를 조율하는 방법을 다룹니다.