LLM 애플리케이션의 입력 분포 변화, 모델 성능 저하, 데이터 드리프트를 감지하고 대응하는 방법을 다룹니다.
드리프트(Drift)는 시간이 지남에 따라 시스템의 입력 분포, 출력 품질, 또는 운영 환경이 변화하는 현상입니다. LLM 애플리케이션에서 드리프트는 성능 저하의 주요 원인이며, 명시적 에러 없이 조용히 발생하기 때문에 더 위험합니다.
1. 입력 드리프트 (Input Drift)
사용자 질문의 주제, 언어, 복잡도 분포가 변화
예: 출시 초기에는 기본 질문 위주 --> 시간이 지나면 고급 질문 증가
2. 모델 드리프트 (Model Drift)
LLM 제공업체의 모델 업데이트로 동일 입력에 대한 출력 변화
예: GPT-4의 내부 가중치 업데이트로 특정 작업 성능 변화
3. 컨텍스트 드리프트 (Context Drift)
RAG 시스템의 지식 베이스 내용 변화
예: 문서 업데이트 후 검색 결과 품질 변화
4. 성능 드리프트 (Performance Drift)
시스템의 응답 지연, 에러율 등 운영 지표의 점진적 변화
예: 트래픽 증가에 따른 응답 시간 점진적 증가LLM 제공업체(OpenAI, Anthropic, Google 등)는 동일한 모델 이름을 유지하면서도 내부적으로 모델을 업데이트할 수 있습니다. 이를 "소리 없는 업데이트(Silent Update)"라고 하며, 2023년 이후 GPT-4에서 여러 차례 보고된 바 있습니다. 이러한 변화는 외부에서 감지하기 어려우므로, 자체적인 모니터링이 필수입니다.
사용자 입력의 주제 분포를 추적하여 변화를 감지합니다.
from collections import Counter
import numpy as np
class TopicDistributionMonitor:
"""입력의 토픽 분포 변화를 감지합니다."""
def __init__(self, baseline_distribution: dict):
self.baseline = baseline_distribution
self.current_window = []
self.window_size = 1000
def add_input(self, topic: str):
"""새로운 입력의 토픽을 기록합니다."""
self.current_window.append(topic)
if len(self.current_window) > self.window_size:
self.current_window.pop(0)
def compute_drift(self) -> dict:
"""현재 분포와 기준 분포 간의 차이를 측정합니다."""
if len(self.current_window) < self.window_size // 2:
return {"status": "insufficient_data"}
current_counts = Counter(self.current_window)
total = sum(current_counts.values())
current_dist = {
k: v / total for k, v in current_counts.items()
}
# Jensen-Shannon Divergence 계산
jsd = self._jensen_shannon_divergence(
self.baseline, current_dist
)
# 새로운 토픽 감지
new_topics = set(current_dist.keys()) - set(self.baseline.keys())
return {
"jsd": round(jsd, 4),
"drift_detected": jsd > 0.1,
"new_topics": list(new_topics),
"top_changes": self._top_changes(self.baseline, current_dist),
}
def _jensen_shannon_divergence(self, p: dict, q: dict) -> float:
"""Jensen-Shannon Divergence를 계산합니다."""
all_keys = set(p.keys()) | set(q.keys())
p_arr = np.array([p.get(k, 0.0001) for k in all_keys])
q_arr = np.array([q.get(k, 0.0001) for k in all_keys])
# 정규화
p_arr = p_arr / p_arr.sum()
q_arr = q_arr / q_arr.sum()
m = 0.5 * (p_arr + q_arr)
jsd = 0.5 * np.sum(p_arr * np.log(p_arr / m)) + \
0.5 * np.sum(q_arr * np.log(q_arr / m))
return float(jsd)
def _top_changes(self, baseline: dict, current: dict, top_n: int = 5) -> list:
"""가장 큰 변화를 보인 토픽을 반환합니다."""
changes = []
all_topics = set(baseline.keys()) | set(current.keys())
for topic in all_topics:
base_pct = baseline.get(topic, 0)
curr_pct = current.get(topic, 0)
changes.append({
"topic": topic,
"baseline_pct": round(base_pct, 4),
"current_pct": round(curr_pct, 4),
"change": round(curr_pct - base_pct, 4),
})
changes.sort(key=lambda x: abs(x["change"]), reverse=True)
return changes[:top_n]class InputLengthMonitor:
"""입력 길이의 분포 변화를 감지합니다."""
def __init__(self, baseline_stats: dict):
self.baseline_mean = baseline_stats["mean"]
self.baseline_std = baseline_stats["std"]
self.baseline_p95 = baseline_stats["p95"]
self.window = []
self.window_size = 500
def add(self, input_length: int):
self.window.append(input_length)
if len(self.window) > self.window_size:
self.window.pop(0)
def check_drift(self) -> dict:
"""입력 길이 분포의 변화를 확인합니다."""
if len(self.window) < 100:
return {"status": "insufficient_data"}
current_mean = np.mean(self.window)
current_std = np.std(self.window)
current_p95 = np.percentile(self.window, 95)
# Z-score 기반 변화 감지
z_score = abs(current_mean - self.baseline_mean) / (
self.baseline_std / np.sqrt(len(self.window))
)
return {
"current_mean": round(float(current_mean), 1),
"baseline_mean": round(self.baseline_mean, 1),
"z_score": round(float(z_score), 2),
"drift_detected": z_score > 3.0,
"p95_change": round(
float(current_p95 - self.baseline_p95), 1
),
}class QualityDriftDetector:
"""출력 품질의 점진적 변화를 감지합니다."""
def __init__(self, baseline_score: float, sensitivity: float = 2.0):
self.baseline = baseline_score
self.sensitivity = sensitivity
self.scores = []
self.timestamps = []
self.cusum_pos = 0.0
self.cusum_neg = 0.0
def add_score(self, score: float, timestamp: float = None):
"""새로운 품질 점수를 추가합니다."""
self.scores.append(score)
self.timestamps.append(timestamp or time.time())
def detect_cusum(self, target_shift: float = 0.1) -> dict:
"""CUSUM(Cumulative Sum) 알고리즘으로 변화점을 감지합니다.
CUSUM은 작은 변화가 지속적으로 누적되는 패턴을 감지하는 데
효과적입니다. 이는 LLM 품질이 점진적으로 저하되는 상황에 적합합니다.
"""
if len(self.scores) < 20:
return {"status": "insufficient_data"}
threshold = self.sensitivity * np.std(self.scores)
drift_points = []
cusum_pos = 0.0
cusum_neg = 0.0
for i, score in enumerate(self.scores):
deviation = score - self.baseline
cusum_pos = max(0, cusum_pos + deviation - target_shift)
cusum_neg = max(0, cusum_neg - deviation - target_shift)
if cusum_pos > threshold:
drift_points.append({
"index": i,
"direction": "positive",
"cusum_value": round(cusum_pos, 4),
})
cusum_pos = 0.0
if cusum_neg > threshold:
drift_points.append({
"index": i,
"direction": "negative",
"cusum_value": round(cusum_neg, 4),
})
cusum_neg = 0.0
return {
"drift_detected": len(drift_points) > 0,
"drift_points": drift_points,
"current_mean": round(float(np.mean(self.scores[-50:])), 4),
"baseline": self.baseline,
}
def detect_rolling_mean(self, window: int = 50) -> dict:
"""이동 평균 기반으로 품질 변화를 감지합니다."""
if len(self.scores) < window:
return {"status": "insufficient_data"}
recent = self.scores[-window:]
rolling_mean = np.mean(recent)
rolling_std = np.std(recent)
# 기준 대비 변화율
change_pct = (rolling_mean - self.baseline) / self.baseline * 100
return {
"rolling_mean": round(float(rolling_mean), 4),
"rolling_std": round(float(rolling_std), 4),
"baseline": self.baseline,
"change_pct": round(float(change_pct), 2),
"degradation_detected": change_pct < -5.0,
}품질 점수의 분포에서 벗어나는 이상 응답을 개별적으로 감지합니다.
class AnomalyDetector:
"""개별 응답 수준의 이상 감지를 수행합니다."""
def __init__(self, history: list, z_threshold: float = 3.0):
self.mean = np.mean(history)
self.std = np.std(history)
self.z_threshold = z_threshold
def is_anomaly(self, score: float) -> dict:
"""개별 점수가 이상치인지 판단합니다."""
if self.std == 0:
return {"is_anomaly": False, "z_score": 0}
z = (score - self.mean) / self.std
return {
"is_anomaly": abs(z) > self.z_threshold,
"z_score": round(float(z), 2),
"direction": "low" if z < 0 else "high",
"score": score,
"expected_range": [
round(self.mean - self.z_threshold * self.std, 3),
round(self.mean + self.z_threshold * self.std, 3),
]
}정기적으로 고정된 테스트 입력에 대한 모델 응답을 생성하고, 이전 응답과의 변화를 추적합니다.
class ModelDriftChecker:
"""LLM 모델 자체의 동작 변화를 감지합니다."""
def __init__(self, reference_inputs: list, model: str):
self.reference_inputs = reference_inputs
self.model = model
self.baseline_responses = {}
def collect_baseline(self):
"""기준 응답을 수집합니다."""
for inp in self.reference_inputs:
responses = []
for _ in range(5): # 변동성 측정을 위해 5회 실행
resp = call_llm(self.model, inp, temperature=0)
responses.append(resp)
self.baseline_responses[inp] = responses
def check_drift(self) -> dict:
"""현재 모델 응답과 기준 응답을 비교합니다."""
results = []
for inp in self.reference_inputs:
current_responses = []
for _ in range(5):
resp = call_llm(self.model, inp, temperature=0)
current_responses.append(resp)
baseline = self.baseline_responses.get(inp, [])
if not baseline:
continue
# 기준과 현재의 의미적 유사도 계산
similarities = []
for curr in current_responses:
for base in baseline:
sim = semantic_similarity(curr, base)
similarities.append(sim)
avg_similarity = np.mean(similarities)
results.append({
"input": inp[:80] + "...",
"avg_similarity": round(float(avg_similarity), 4),
"drift_detected": avg_similarity < 0.85,
})
drifted = [r for r in results if r["drift_detected"]]
return {
"total_checks": len(results),
"drifted_count": len(drifted),
"drift_rate": round(len(drifted) / len(results), 3) if results else 0,
"details": results,
}모델 드리프트 체크를 매일 자동으로 실행하는 크론 작업(Cron Job)을 설정하면, 제공업체의 모델 변경을 조기에 감지할 수 있습니다. 20-50개의 대표적인 입력으로 구성된 "카나리아 세트(Canary Set)"를 유지하세요.
class CostDriftMonitor:
"""비용 관련 지표의 이상 변화를 감지합니다."""
def __init__(self):
self.daily_costs = []
self.hourly_token_usage = []
def check_cost_anomaly(self, current_daily_cost: float) -> dict:
"""일일 비용의 이상 여부를 확인합니다."""
if len(self.daily_costs) < 7:
self.daily_costs.append(current_daily_cost)
return {"status": "collecting_baseline"}
# 최근 7일 기준
recent = self.daily_costs[-7:]
mean_cost = np.mean(recent)
std_cost = np.std(recent)
if std_cost == 0:
std_cost = mean_cost * 0.1 # 최소 변동성
z_score = (current_daily_cost - mean_cost) / std_cost
self.daily_costs.append(current_daily_cost)
return {
"current_cost": round(current_daily_cost, 2),
"expected_cost": round(float(mean_cost), 2),
"z_score": round(float(z_score), 2),
"anomaly": z_score > 2.5,
"pct_over_expected": round(
(current_daily_cost - mean_cost) / mean_cost * 100, 1
) if mean_cost > 0 else 0,
}
def detect_token_inflation(
self,
recent_avg_tokens: float,
baseline_avg_tokens: float,
threshold_pct: float = 20.0
) -> dict:
"""평균 토큰 사용량의 인플레이션을 감지합니다."""
change_pct = (
(recent_avg_tokens - baseline_avg_tokens) / baseline_avg_tokens * 100
)
return {
"recent_avg": round(recent_avg_tokens, 1),
"baseline_avg": round(baseline_avg_tokens, 1),
"change_pct": round(change_pct, 1),
"inflation_detected": change_pct > threshold_pct,
}class MonitoringOrchestrator:
"""모든 드리프트 감지기를 통합 관리합니다."""
def __init__(self):
self.monitors = {}
self.alert_manager = AlertManager()
def register(self, name: str, monitor):
"""모니터를 등록합니다."""
self.monitors[name] = monitor
def run_all_checks(self) -> dict:
"""등록된 모든 모니터를 실행합니다."""
results = {}
alerts = []
for name, monitor in self.monitors.items():
try:
result = monitor.check()
results[name] = result
if result.get("drift_detected") or result.get("anomaly"):
alerts.append({
"monitor": name,
"severity": self._determine_severity(result),
"details": result,
})
except Exception as e:
results[name] = {"error": str(e)}
# 알림 발송
for alert in alerts:
self.alert_manager.fire(
alert["monitor"],
alert
)
return {
"timestamp": datetime.utcnow().isoformat(),
"results": results,
"alerts_fired": len(alerts),
"overall_healthy": len(alerts) == 0,
}
def _determine_severity(self, result: dict) -> str:
"""결과의 심각도를 판단합니다."""
if result.get("degradation_detected"):
return "critical"
if result.get("drift_detected"):
return "warning"
return "info"드리프트가 감지되었을 때의 대응 전략을 사전에 정의합니다.
드리프트 유형별 자동 대응:
입력 드리프트 (경미):
--> 알림 전송
--> 평가 데이터셋에 새로운 패턴 추가 검토
입력 드리프트 (심각):
--> 알림 전송
--> 긴급 평가 실행
--> 필요시 시스템 프롬프트 업데이트
모델 드리프트:
--> 알림 전송
--> 오프라인 평가 자동 트리거
--> 기준치 미달 시 이전 모델 버전으로 롤백 검토
품질 드리프트 (점진적):
--> 알림 전송
--> 일일 리포트에 추세 포함
--> 프롬프트 최적화 작업 생성
비용 드리프트:
--> 알림 전송
--> 토큰 사용량 상세 분석
--> 비용 상한 도달 시 자동 스로틀링드리프트는 LLM 애플리케이션에서 성능 저하의 주요 원인이며, 입력, 모델, 컨텍스트, 성능 등 여러 차원에서 발생합니다. Jensen-Shannon Divergence, CUSUM, 이동 평균 등의 통계적 방법으로 변화를 감지하고, 이상 탐지로 개별 문제를 식별할 수 있습니다.
효과적인 드리프트 모니터링은 자동 감지와 사전 정의된 대응 전략의 조합입니다. 감지만으로는 충분하지 않으며, 감지된 문제에 대한 구체적인 대응 절차가 준비되어 있어야 합니다.
다음 장에서는 지금까지 다룬 평가와 모니터링 체계를 소프트웨어 개발 파이프라인에 통합하는 CI/CD 평가 파이프라인을 다룹니다.
이 글이 도움이 되셨나요?
LLM 평가를 CI/CD 파이프라인에 통합하여, 프롬프트 변경과 모델 교체 시 자동으로 품질을 검증하는 체계를 구축합니다.
LLM 애플리케이션의 프로덕션 환경에서 구조화된 로깅, 분산 트레이싱, 관찰 가능성을 구축하는 방법을 다룹니다.
지금까지 다룬 평가 메트릭, LLM-as-Judge, 모니터링, CI/CD를 통합하여 프로덕션 수준의 종합 평가 시스템을 구축합니다.