지금까지 다룬 평가 메트릭, LLM-as-Judge, 모니터링, CI/CD를 통합하여 프로덕션 수준의 종합 평가 시스템을 구축합니다.
이 장에서는 지금까지 학습한 모든 개념을 통합하여 프로덕션 수준의 LLM 평가 모니터링 시스템을 구축합니다. 대상은 고객 지원 QA 시스템이며, 다음 기능을 포함합니다.
시스템 아키텍처 개요:
사용자 --> QA 애플리케이션 --> 응답
|
v
로깅/트레이싱 수집
|
v
+--------+--------+
| | |
v v v
실시간 품질 비용
메트릭 평가 추적
| | |
v v v
+--------+--------+
|
v
대시보드 + 알림llm-eval-system/
src/
app/
qa_service.py # QA 애플리케이션 본체
prompt_manager.py # 프롬프트 로드 및 관리
evaluation/
runner.py # 평가 실행 엔진
metrics.py # 메트릭 정의
judge.py # LLM-as-Judge
dataset.py # 데이터셋 관리
monitoring/
logger.py # 구조화된 로깅
tracer.py # 분산 트레이싱
metrics_collector.py # 메트릭 수집
drift_detector.py # 드리프트 감지
alerts.py # 알림 관리
reporting/
dashboard.py # 대시보드 데이터
reporter.py # 리포트 생성
prompts/
qa/
system.txt
config.yaml
eval/
datasets/
core-50.json
full-500.json
baselines/
current.json
tests/
test_eval_pipeline.py
test_metrics.py
.github/
workflows/
llm-eval.yml# 실제 파일에는 아래 내용이 텍스트로 들어갑니다
"""
당신은 고객 지원 전문가입니다.
고객의 질문에 정확하고 도움이 되는 답변을 제공하세요.
규칙:
1. 공식 문서에 기반하여 답변하세요.
2. 확실하지 않은 정보는 "확인이 필요합니다"라고 명시하세요.
3. 기술적 용어는 쉽게 풀어서 설명하세요.
4. 답변은 간결하되 완전하게 작성하세요.
"""import time
import uuid
from dataclasses import dataclass
from typing import Optional
@dataclass
class QARequest:
question: str
user_id: str
session_id: str
context: Optional[str] = None
@dataclass
class QAResponse:
answer: str
trace_id: str
model: str
input_tokens: int
output_tokens: int
latency_ms: float
prompt_version: str
class QAService:
"""고객 지원 QA 서비스입니다."""
def __init__(self, prompt_manager, llm_client, logger, tracer, metrics):
self.prompt_manager = prompt_manager
self.llm_client = llm_client
self.logger = logger
self.tracer = tracer
self.metrics = metrics
async def answer(self, request: QARequest) -> QAResponse:
"""사용자 질문에 답변합니다."""
trace_id = str(uuid.uuid4())[:12]
start_time = time.perf_counter()
root_span = self.tracer.start_trace("qa_answer")
try:
# 1. 프롬프트 로드
with self.tracer.start_span("load_prompt", root_span) as span:
prompt_config = self.prompt_manager.load_prompt("qa")
span.set_attribute("prompt_version", prompt_config["config"]["version"])
# 2. 컨텍스트 검색 (RAG)
with self.tracer.start_span("retrieve_context", root_span) as span:
context = await self._retrieve_context(request.question)
span.set_attribute("context_length", len(context))
# 3. 프롬프트 조립
messages = [
{"role": "system", "content": prompt_config["system_prompt"]},
{"role": "user", "content": self._format_user_message(
request.question, context
)},
]
# 4. LLM 호출
with self.tracer.start_span("llm_call", root_span) as span:
llm_response = await self.llm_client.chat(
model=prompt_config["config"]["model"],
messages=messages,
temperature=prompt_config["config"]["temperature"],
max_tokens=prompt_config["config"]["max_tokens"],
)
span.set_attribute("model", prompt_config["config"]["model"])
span.set_attribute("input_tokens", llm_response.input_tokens)
span.set_attribute("output_tokens", llm_response.output_tokens)
latency_ms = (time.perf_counter() - start_time) * 1000
# 5. 로깅
self.logger.log_call(
trace_id=trace_id,
request=request,
response=llm_response,
latency_ms=latency_ms,
prompt_version=prompt_config["config"]["version"],
)
# 6. 메트릭 기록
self.metrics.increment("qa_requests_total")
self.metrics.record("qa_latency_ms", latency_ms)
self.metrics.record("qa_tokens_used",
llm_response.input_tokens + llm_response.output_tokens)
return QAResponse(
answer=llm_response.content,
trace_id=trace_id,
model=prompt_config["config"]["model"],
input_tokens=llm_response.input_tokens,
output_tokens=llm_response.output_tokens,
latency_ms=latency_ms,
prompt_version=prompt_config["config"]["version"],
)
except Exception as e:
self.metrics.increment("qa_errors_total")
self.logger.log_error(trace_id, str(e))
raise
async def _retrieve_context(self, question: str) -> str:
"""관련 문서를 검색합니다."""
results = await self.vector_store.search(question, top_k=3)
return "\n\n".join(r["content"] for r in results)
def _format_user_message(self, question: str, context: str) -> str:
return "참고 문서:\n" + context + "\n\n고객 질문: " + questionfrom abc import ABC, abstractmethod
class BaseMetric(ABC):
"""평가 메트릭의 기본 클래스입니다."""
def __init__(self, name: str, threshold: float):
self.name = name
self.threshold = threshold
@abstractmethod
def evaluate(self, prediction: str, reference: str = None,
input_text: str = None, context: str = None) -> float:
pass
class ExactMatchMetric(BaseMetric):
def evaluate(self, prediction, reference=None, **kwargs):
if not reference:
return 0.0
return 1.0 if prediction.strip().lower() == reference.strip().lower() else 0.0
class ContainsKeywordsMetric(BaseMetric):
"""핵심 키워드 포함 여부를 확인합니다."""
def evaluate(self, prediction, reference=None, **kwargs):
if not reference:
return 0.0
keywords = reference.split(",")
found = sum(
1 for kw in keywords
if kw.strip().lower() in prediction.lower()
)
return found / len(keywords) if keywords else 0.0
class SemanticSimilarityMetric(BaseMetric):
"""임베딩 기반 의미적 유사도를 측정합니다."""
def __init__(self, name, threshold, embedding_model="text-embedding-3-small"):
super().__init__(name, threshold)
self.embedding_model = embedding_model
def evaluate(self, prediction, reference=None, **kwargs):
if not reference:
return 0.0
pred_emb = get_embedding(prediction, self.embedding_model)
ref_emb = get_embedding(reference, self.embedding_model)
return cosine_similarity(pred_emb, ref_emb)
class LLMJudgeMetric(BaseMetric):
"""LLM-as-Judge 기반 평가 메트릭입니다."""
def __init__(self, name, threshold, criterion, judge_model="claude-sonnet-4-20250514"):
super().__init__(name, threshold)
self.criterion = criterion
self.judge_model = judge_model
def evaluate(self, prediction, reference=None,
input_text=None, context=None, **kwargs):
prompt = """다음 응답을 평가하세요.
질문: {input_text}
응답: {prediction}
{context_section}
평가 기준: {criterion}
1-5 척도로 점수를 매기세요.
반드시 "점수: N" 형식으로 시작하세요.""".format(
input_text=input_text or "",
prediction=prediction,
context_section="컨텍스트: " + context if context else "",
criterion=self.criterion,
)
response = call_llm(self.judge_model, prompt)
score = parse_score(response)
return score / 5.0 # 0-1 범위로 정규화import asyncio
import json
import time
class EvalRunner:
"""평가 파이프라인을 실행합니다."""
def __init__(self, qa_service, metrics_list, concurrency=5):
self.qa_service = qa_service
self.metrics_list = metrics_list
self.concurrency = concurrency
async def run(self, dataset_path: str, output_path: str) -> dict:
"""전체 평가를 실행합니다."""
# 데이터셋 로드
with open(dataset_path) as f:
dataset = json.load(f)
cases = dataset["cases"]
print("평가 시작: " + str(len(cases)) + "건")
# 병렬 실행
semaphore = asyncio.Semaphore(self.concurrency)
start_time = time.perf_counter()
async def eval_case(case):
async with semaphore:
return await self._evaluate_single(case)
tasks = [eval_case(c) for c in cases]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 에러 처리
valid_results = []
errors = []
for i, r in enumerate(results):
if isinstance(r, Exception):
errors.append({"case_id": cases[i]["id"], "error": str(r)})
else:
valid_results.append(r)
elapsed = time.perf_counter() - start_time
# 집계
report = self._aggregate(valid_results, elapsed, errors)
# 저장
with open(output_path, "w") as f:
json.dump(report, f, ensure_ascii=False, indent=2)
print("평가 완료: " + str(round(elapsed, 1)) + "초")
return report
async def _evaluate_single(self, case: dict) -> dict:
"""단일 케이스를 평가합니다."""
request = QARequest(
question=case["input"],
user_id="eval-user",
session_id="eval-session",
)
response = await self.qa_service.answer(request)
metric_scores = {}
for metric in self.metrics_list:
score = metric.evaluate(
prediction=response.answer,
reference=case.get("expected_output"),
input_text=case["input"],
context=case.get("context"),
)
metric_scores[metric.name] = round(score, 4)
return {
"case_id": case["id"],
"input": case["input"],
"output": response.answer,
"expected": case.get("expected_output"),
"metrics": metric_scores,
"latency_ms": response.latency_ms,
"tokens": response.input_tokens + response.output_tokens,
"metadata": case.get("metadata", {}),
}
def _aggregate(self, results: list, elapsed: float, errors: list) -> dict:
"""결과를 집계합니다."""
import statistics
metric_scores = {}
for result in results:
for metric_name, score in result["metrics"].items():
if metric_name not in metric_scores:
metric_scores[metric_name] = []
metric_scores[metric_name].append(score)
metrics_summary = {}
overall_pass = True
for metric in self.metrics_list:
scores = metric_scores.get(metric.name, [])
if not scores:
continue
mean_score = statistics.mean(scores)
passed = mean_score >= metric.threshold
metrics_summary[metric.name] = {
"mean": round(mean_score, 4),
"std": round(statistics.stdev(scores), 4) if len(scores) > 1 else 0,
"min": round(min(scores), 4),
"max": round(max(scores), 4),
"threshold": metric.threshold,
"passed": passed,
}
if not passed:
overall_pass = False
# 지연 시간 통계
latencies = [r["latency_ms"] for r in results]
return {
"overall_pass": overall_pass,
"metrics": metrics_summary,
"latency": {
"mean_ms": round(statistics.mean(latencies), 1),
"p95_ms": round(sorted(latencies)[int(len(latencies) * 0.95)], 1),
"max_ms": round(max(latencies), 1),
},
"total_cases": len(results),
"errors": len(errors),
"elapsed_seconds": round(elapsed, 1),
"error_details": errors[:10],
}from datetime import datetime
class MonitoringService:
"""프로덕션 모니터링을 통합 관리합니다."""
def __init__(self):
self.metrics_collector = MetricsCollector()
self.quality_detector = QualityDriftDetector(
baseline_score=4.0,
sensitivity=2.0
)
self.cost_monitor = CostDriftMonitor()
self.alert_manager = AlertManager()
self.buffer = []
self.buffer_size = 100
def record_response(self, log_entry: dict):
"""응답을 기록하고 모니터링 지표를 업데이트합니다."""
# 메트릭 수집
self.metrics_collector.increment(
"total_requests",
tags={"model": log_entry["model"]}
)
self.metrics_collector.record(
"latency_ms",
log_entry["latency_ms"],
tags={"model": log_entry["model"]}
)
self.metrics_collector.record(
"tokens_used",
log_entry["input_tokens"] + log_entry["output_tokens"]
)
# 비동기 품질 평가 결과 처리
if "quality_score" in log_entry:
self.quality_detector.add_score(log_entry["quality_score"])
# 버퍼링 후 배치 분석
self.buffer.append(log_entry)
if len(self.buffer) >= self.buffer_size:
self._analyze_batch()
self.buffer = []
def _analyze_batch(self):
"""버퍼된 데이터를 배치 분석합니다."""
# 품질 드리프트 체크
drift_result = self.quality_detector.detect_rolling_mean(window=50)
if drift_result.get("degradation_detected"):
self.alert_manager.fire("quality_degradation", {
"current_mean": drift_result["rolling_mean"],
"baseline": drift_result["baseline"],
"change_pct": drift_result["change_pct"],
})
# 에러율 체크
errors = sum(
1 for entry in self.buffer
if entry.get("error")
)
error_rate = errors / len(self.buffer)
if error_rate > 0.05:
self.alert_manager.fire("high_error_rate", {
"error_rate": round(error_rate, 3),
"sample_size": len(self.buffer),
})
def get_dashboard_data(self) -> dict:
"""대시보드용 데이터를 반환합니다."""
return {
"timestamp": datetime.utcnow().isoformat(),
"request_count": self.metrics_collector.counters.get(
"total_requests", 0
),
"latency_p50": self.metrics_collector.get_percentile(
"latency_ms", 50
),
"latency_p95": self.metrics_collector.get_percentile(
"latency_ms", 95
),
"error_rate": self._calculate_error_rate(),
"quality_trend": self.quality_detector.detect_rolling_mean(),
"active_alerts": list(self.alert_manager.active_alerts.keys()),
}
def _calculate_error_rate(self) -> float:
total = self.metrics_collector.counters.get("total_requests", 0)
errors = self.metrics_collector.counters.get("total_errors", 0)
return round(errors / total, 4) if total > 0 else 0.0import asyncio
import json
from datetime import datetime
async def run_daily_benchmark():
"""매일 실행되는 종합 벤치마크입니다."""
# 1. 평가 메트릭 구성
metrics = [
ContainsKeywordsMetric("keyword_match", threshold=0.7),
SemanticSimilarityMetric("semantic_sim", threshold=0.75),
LLMJudgeMetric(
"accuracy", threshold=0.8,
criterion="답변이 사실적으로 정확한가"
),
LLMJudgeMetric(
"helpfulness", threshold=0.75,
criterion="답변이 질문자에게 실질적으로 도움이 되는가"
),
LLMJudgeMetric(
"safety", threshold=0.95,
criterion="답변에 유해하거나 부적절한 내용이 없는가"
),
]
# 2. 서비스 초기화
qa_service = create_qa_service()
runner = EvalRunner(qa_service, metrics, concurrency=10)
# 3. 전체 평가 실행
today = datetime.now().strftime("%Y-%m-%d")
output_path = "eval/results/daily-" + today + ".json"
results = await runner.run("eval/datasets/full-500.json", output_path)
# 4. 이전 결과와 비교
regression = RegressionTester("eval/baselines/current.json")
comparison = regression.run_regression_test(results)
# 5. 모델 드리프트 체크
drift_checker = ModelDriftChecker(
reference_inputs=load_canary_set(),
model="claude-sonnet-4-20250514"
)
drift_result = drift_checker.check_drift()
# 6. 리포트 생성
report = {
"date": today,
"eval_results": results,
"regression": comparison,
"model_drift": drift_result,
"summary": generate_summary(results, comparison, drift_result),
}
# 7. 리포트 저장 및 발송
save_report(report, "eval/reports/daily-" + today + ".json")
send_daily_report(report)
# 8. 심각한 문제 시 알림
if not results["overall_pass"]:
send_alert("daily_eval_failed", report["summary"])
if comparison and not comparison["passed"]:
send_alert("regression_detected", comparison)
if drift_result.get("drift_rate", 0) > 0.2:
send_alert("model_drift_detected", drift_result)
return report
def generate_summary(results, comparison, drift) -> str:
"""일일 리포트 요약을 생성합니다."""
lines = []
lines.append("일일 평가 요약")
lines.append("전체 결과: " + ("통과" if results["overall_pass"] else "실패"))
lines.append("평가 건수: " + str(results["total_cases"]))
for metric, data in results["metrics"].items():
status = "통과" if data["passed"] else "실패"
lines.append(
" " + metric + ": "
+ str(data["mean"]) + " (" + status + ")"
)
if comparison:
regressed = [
c for c in comparison.get("comparisons", [])
if c.get("regressed")
]
if regressed:
lines.append("회귀 감지: " + str(len(regressed)) + "개 메트릭")
if drift and drift.get("drift_rate", 0) > 0:
lines.append(
"모델 드리프트: "
+ str(round(drift["drift_rate"] * 100, 1)) + "% 입력에서 감지"
)
return "\n".join(lines)import pytest
import asyncio
class TestEvalPipeline:
"""평가 파이프라인의 통합 테스트입니다."""
def test_metric_calculation(self):
"""메트릭이 올바르게 계산되는지 확인합니다."""
metric = ContainsKeywordsMetric("test", threshold=0.5)
# 모든 키워드 포함
score = metric.evaluate(
prediction="파이썬의 GIL은 Global Interpreter Lock입니다",
reference="GIL,Global Interpreter Lock,파이썬"
)
assert score == 1.0
# 일부 키워드만 포함
score = metric.evaluate(
prediction="파이썬의 GIL 설명",
reference="GIL,Global Interpreter Lock,파이썬"
)
assert 0.5 <= score <= 0.8
def test_eval_gate_pass(self):
"""게이트 통과 조건을 확인합니다."""
gate = EvalGate({
"name": "test-gate",
"thresholds": {
"accuracy": {"min": 0.7},
"safety": {"min": 0.9},
}
})
result = gate.check({
"metrics": {
"accuracy": {"mean": 0.85},
"safety": {"mean": 0.95},
}
})
assert result["passed"] is True
def test_eval_gate_fail(self):
"""게이트 실패 조건을 확인합니다."""
gate = EvalGate({
"name": "test-gate",
"thresholds": {
"accuracy": {"min": 0.7},
}
})
result = gate.check({
"metrics": {
"accuracy": {"mean": 0.55},
}
})
assert result["passed"] is False
assert len(result["failures"]) == 1
def test_drift_detection(self):
"""드리프트 감지가 동작하는지 확인합니다."""
detector = QualityDriftDetector(baseline_score=4.0)
# 정상 범위 데이터
for _ in range(50):
detector.add_score(3.9 + 0.2 * (0.5 - __import__("random").random()))
result = detector.detect_rolling_mean(window=50)
assert not result.get("degradation_detected", False)
# 품질 저하 시뮬레이션
for _ in range(50):
detector.add_score(3.0 + 0.2 * (0.5 - __import__("random").random()))
result = detector.detect_rolling_mean(window=50)
assert result.get("degradation_detected", True)
def test_pii_masking(self):
"""PII 마스킹이 올바르게 동작하는지 확인합니다."""
text = "연락처는 010-1234-5678이고 이메일은 test@example.com입니다"
masked = PiiMasker.mask(text)
assert "010-1234-5678" not in masked
assert "test@example.com" not in masked
assert "[PHONE]" in masked
assert "[EMAIL]" in masked매일 확인 사항:
1. 대시보드 확인
- 에러율이 정상 범위인가 (목표: < 1%)
- P95 지연 시간이 SLA 이내인가 (목표: < 5초)
- 일일 비용이 예산 범위인가
2. 알림 확인
- 미처리 알림이 있는가
- 반복 알림 패턴이 있는가
3. 일일 벤치마크 리포트
- 메트릭 추세 확인
- 회귀 발생 여부 확인
- 모델 드리프트 감지 여부 확인
4. 사용자 피드백
- 부정적 피드백 비율 추세
- 반복되는 불만 패턴 분석품질 저하 인시던트 대응 흐름:
1. 감지 (자동 알림 또는 수동 발견)
--> 인시던트 기록 생성
2. 분류 (5분 이내)
--> 심각도 판단: Critical / Warning / Info
--> 영향 범위 파악: 전체 사용자 / 특정 세그먼트
3. 진단 (15분 이내)
--> 트레이스 분석: 어느 단계에서 문제 발생?
--> 최근 변경 확인: 프롬프트, 모델, 데이터 소스
--> 드리프트 분석: 입력 패턴 변화?
4. 완화 (30분 이내)
--> 즉시 가능한 조치: 롤백, 트래픽 조절
--> 임시 패치 적용
5. 해결
--> 근본 원인 수정
--> 평가 파이프라인으로 수정 검증
--> 모니터링 강화
6. 사후 분석
--> 인시던트 리포트 작성
--> 재발 방지 조치 도출
--> 평가 데이터셋에 관련 케이스 추가class MultiModelEvaluator:
"""여러 모델을 동시에 평가합니다."""
def __init__(self, models: list, metrics: list):
self.models = models
self.metrics = metrics
async def compare_models(self, dataset_path: str) -> dict:
"""여러 모델의 성능을 비교합니다."""
results = {}
for model in self.models:
qa_service = create_qa_service(model=model)
runner = EvalRunner(qa_service, self.metrics)
result = await runner.run(
dataset_path,
output_path="/tmp/eval-" + model + ".json"
)
results[model] = result
# 모델 간 비교 테이블 생성
comparison = self._build_comparison_table(results)
return comparison
def _build_comparison_table(self, results: dict) -> dict:
"""모델 간 비교 테이블을 생성합니다."""
table = {"models": {}}
for model, result in results.items():
table["models"][model] = {
"metrics": {
m: result["metrics"][m]["mean"]
for m in result["metrics"]
},
"latency_p95": result["latency"]["p95_ms"],
"total_tokens": sum(
r.get("tokens", 0) for r in result.get("details", [])
),
"overall_pass": result["overall_pass"],
}
# 최적 모델 추천
best_quality = max(
table["models"],
key=lambda m: sum(table["models"][m]["metrics"].values())
)
best_speed = min(
table["models"],
key=lambda m: table["models"][m]["latency_p95"]
)
table["recommendations"] = {
"best_quality": best_quality,
"best_speed": best_speed,
}
return table프로덕션 환경에서는 단일 모델에 의존하는 것보다, 모델 라우팅(Model Routing) 전략을 고려하세요. 간단한 질문은 빠르고 저렴한 모델(Claude Haiku, GPT-4o-mini)로 처리하고, 복잡한 질문은 고성능 모델(Claude Sonnet, GPT-4o)로 라우팅하면 비용과 품질의 균형을 맞출 수 있습니다.
이 시리즈에서는 LLM 애플리케이션의 평가와 모니터링에 필요한 전체 체계를 다루었습니다.
| 장 | 핵심 내용 | 실무 적용 |
|---|---|---|
| 1장 | 평가 프레임워크 전체 구조 | 평가 전략 수립 |
| 2장 | 메트릭 설계와 기준치 설정 | 메트릭 선택과 정의 |
| 3장 | 자동 평가 파이프라인 구축 | DeepEval, Promptfoo 활용 |
| 4장 | LLM-as-Judge | 의미적 품질 자동 평가 |
| 5장 | 인간 평가와 어노테이션 | 자동 평가 보완 및 검증 |
| 6장 | A/B 테스트 | 프로덕션 실험 설계 |
| 7장 | 로깅과 관찰 가능성 | 프로덕션 가시성 확보 |
| 8장 | 드리프트 감지 | 품질 저하 조기 발견 |
| 9장 | CI/CD 통합 | 자동 품질 게이트 |
| 10장 | 종합 시스템 구축 | 전체 통합 |
LLM 평가는 일회성 작업이 아니라 지속적인 프로세스입니다. 완벽한 시스템을 한 번에 구축하려 하지 말고, 가장 중요한 메트릭부터 시작하여 점진적으로 확장하는 것이 실무에서 가장 효과적인 접근법입니다.
이 글이 도움이 되셨나요?