LLM 애플리케이션의 프로덕션 환경에서 구조화된 로깅, 분산 트레이싱, 관찰 가능성을 구축하는 방법을 다룹니다.
관찰 가능성(Observability)은 시스템의 외부 출력을 통해 내부 상태를 이해할 수 있는 정도를 의미합니다. LLM 애플리케이션에서 관찰 가능성은 세 가지 축으로 구성됩니다.
관찰 가능성의 세 기둥:
로그 : "사용자 X의 요청이 3.2초 만에 처리됨, 입력 토큰 450, 출력 토큰 230"
메트릭 : "지난 1시간 평균 응답 시간 2.8초, P95 5.1초, 에러율 0.3%"
트레이스 : "요청 수신 --> 컨텍스트 검색 (0.8초) --> 프롬프트 조립 --> LLM 호출 (2.1초) --> 후처리 --> 응답"전통적인 소프트웨어에서도 관찰 가능성은 중요하지만, LLM 애플리케이션에서는 특히 더 중요합니다. 출력이 비결정론적이고, 비용이 입출력에 비례하며, 품질 저하가 명시적 에러 없이 발생하기 때문입니다.
모든 LLM 호출에 대해 일관된 구조로 로그를 남겨야 합니다.
from dataclasses import dataclass, field, asdict
from datetime import datetime
from typing import Optional
import json
import uuid
@dataclass
class LLMCallLog:
"""LLM API 호출 단위의 로그 구조입니다."""
# 식별 정보
trace_id: str # 요청 전체 추적 ID
span_id: str # 이 호출의 고유 ID
parent_span_id: Optional[str] # 부모 스팬 (에이전트 체인)
# 요청 정보
model: str
prompt_version: str
system_prompt_hash: str # 시스템 프롬프트의 해시
input_text: str
input_tokens: int
# 응답 정보
output_text: str
output_tokens: int
finish_reason: str # stop, length, content_filter 등
# 성능 정보
latency_ms: float
time_to_first_token_ms: Optional[float] = None
# 비용 정보
estimated_cost_usd: float = 0.0
# 메타데이터
timestamp: str = field(
default_factory=lambda: datetime.utcnow().isoformat()
)
user_id: Optional[str] = None
session_id: Optional[str] = None
environment: str = "production"
tags: dict = field(default_factory=dict)
# 품질 메트릭 (비동기로 채워질 수 있음)
quality_scores: dict = field(default_factory=dict)
def to_json(self) -> str:
return json.dumps(asdict(self), ensure_ascii=False)프로덕션 로그에는 사용자의 개인정보가 포함될 수 있으므로, 로깅 전에 마스킹(Masking) 처리를 적용해야 합니다.
import re
class PiiMasker:
"""개인 식별 정보를 마스킹합니다."""
PATTERNS = {
"email": (
r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}",
"[EMAIL]"
),
"phone_kr": (
r"01[016789]-?\d{3,4}-?\d{4}",
"[PHONE]"
),
"credit_card": (
r"\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}",
"[CARD]"
),
}
@classmethod
def mask(cls, text: str) -> str:
"""텍스트에서 PII 패턴을 마스킹합니다."""
masked = text
for pattern_name, (pattern, replacement) in cls.PATTERNS.items():
masked = re.sub(pattern, replacement, masked)
return masked
@classmethod
def mask_log(cls, log: LLMCallLog) -> LLMCallLog:
"""로그 객체의 텍스트 필드를 마스킹합니다."""
log.input_text = cls.mask(log.input_text)
log.output_text = cls.mask(log.output_text)
return logLLM 입출력 전체를 로깅하면 저장 비용이 급격히 증가합니다. 프로덕션에서는 전체 텍스트 대신 요약이나 해시만 저장하고, 필요 시 원본을 조회할 수 있는 별도 저장소를 두는 전략이 효율적입니다. GDPR이나 개인정보보호법 준수를 위해 보존 기간 정책도 설정해야 합니다.
RAG 시스템이나 에이전트 시스템은 하나의 사용자 요청이 여러 단계를 거칩니다. 트레이싱은 이 전체 흐름을 하나의 연결된 경로로 추적합니다.
import time
class Span:
"""트레이스의 개별 구간을 나타냅니다."""
def __init__(self, name: str, trace_id: str, parent_id: str = None):
self.span_id = str(uuid.uuid4())[:8]
self.trace_id = trace_id
self.parent_id = parent_id
self.name = name
self.start_time = None
self.end_time = None
self.attributes = {}
self.events = []
def __enter__(self):
self.start_time = time.perf_counter()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.end_time = time.perf_counter()
if exc_type:
self.set_attribute("error", True)
self.set_attribute("error_type", str(exc_type))
@property
def duration_ms(self) -> float:
if self.start_time and self.end_time:
return (self.end_time - self.start_time) * 1000
return 0
def set_attribute(self, key: str, value):
self.attributes[key] = value
def add_event(self, name: str, attributes: dict = None):
self.events.append({
"name": name,
"timestamp": time.perf_counter(),
"attributes": attributes or {},
})
class Tracer:
"""LLM 요청의 전체 경로를 추적합니다."""
def __init__(self):
self.traces = {}
def start_trace(self, name: str) -> Span:
trace_id = str(uuid.uuid4())[:12]
span = Span(name, trace_id)
self.traces[trace_id] = [span]
return span
def start_span(self, name: str, parent: Span) -> Span:
span = Span(name, parent.trace_id, parent.span_id)
if parent.trace_id in self.traces:
self.traces[parent.trace_id].append(span)
return spanasync def rag_pipeline_traced(query: str, tracer: Tracer) -> str:
"""트레이싱이 적용된 RAG 파이프라인입니다."""
root = tracer.start_trace("rag_pipeline")
with root:
root.set_attribute("query", query)
# 1. 쿼리 분석
with tracer.start_span("query_analysis", root) as span:
analyzed = analyze_query(query)
span.set_attribute("intent", analyzed["intent"])
span.set_attribute("entities", str(analyzed["entities"]))
# 2. 문서 검색
with tracer.start_span("retrieval", root) as span:
documents = await retrieve_documents(analyzed)
span.set_attribute("num_retrieved", len(documents))
span.set_attribute("top_score", documents[0]["score"] if documents else 0)
# 3. 컨텍스트 조립
with tracer.start_span("context_assembly", root) as span:
context = assemble_context(documents)
span.set_attribute("context_tokens", count_tokens(context))
# 4. LLM 호출
with tracer.start_span("llm_call", root) as span:
response = await call_llm(query, context)
span.set_attribute("model", response["model"])
span.set_attribute("input_tokens", response["input_tokens"])
span.set_attribute("output_tokens", response["output_tokens"])
# 5. 후처리
with tracer.start_span("post_processing", root) as span:
final = post_process(response["text"])
span.set_attribute("output_length", len(final))
root.set_attribute("total_tokens",
response["input_tokens"] + response["output_tokens"])
return finalOpenTelemetry(OTel)는 관찰 가능성의 사실상 표준입니다. LLM 특화 확장인 GenAI Semantic Conventions를 활용하면, 다양한 백엔드와 호환되는 표준화된 텔레메트리를 수집할 수 있습니다.
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
def setup_tracing(service_name: str, otlp_endpoint: str):
"""OpenTelemetry 트레이싱을 초기화합니다."""
provider = TracerProvider(
resource=Resource.create({"service.name": service_name})
)
exporter = OTLPSpanExporter(endpoint=otlp_endpoint)
provider.add_span_processor(BatchSpanProcessor(exporter))
trace.set_tracer_provider(provider)
return trace.get_tracer(service_name)
# GenAI Semantic Conventions에 따른 속성
GENAI_ATTRIBUTES = {
"gen_ai.system": "openai", # 또는 "anthropic"
"gen_ai.request.model": "gpt-4o",
"gen_ai.request.max_tokens": 1024,
"gen_ai.request.temperature": 0.7,
"gen_ai.response.finish_reasons": ["stop"],
"gen_ai.usage.input_tokens": 450,
"gen_ai.usage.output_tokens": 230,
}Level 1 - 비즈니스 메트릭 (경영진 대시보드):
- 일일 활성 사용자 (DAU)
- 사용자 만족도 (NPS, 썸업 비율)
- 월간 API 비용
Level 2 - 서비스 메트릭 (엔지니어링 대시보드):
- 응답 지연 시간 (P50, P95, P99)
- 에러율
- 처리량 (RPS)
- 토큰 사용량 추이
Level 3 - 모델 메트릭 (ML 대시보드):
- 응답 품질 점수 분포
- 환각 감지율
- 안전성 위반율
- 프롬프트별 성능 비교from collections import defaultdict
import time
class MetricsCollector:
"""프로덕션 메트릭을 수집하고 집계합니다."""
def __init__(self):
self.counters = defaultdict(int)
self.histograms = defaultdict(list)
self.gauges = {}
def increment(self, name: str, value: int = 1, tags: dict = None):
"""카운터를 증가시킵니다."""
key = self._make_key(name, tags)
self.counters[key] += value
def record(self, name: str, value: float, tags: dict = None):
"""히스토그램에 값을 기록합니다."""
key = self._make_key(name, tags)
self.histograms[key].append(value)
def set_gauge(self, name: str, value: float, tags: dict = None):
"""게이지 값을 설정합니다."""
key = self._make_key(name, tags)
self.gauges[key] = value
def get_percentile(self, name: str, percentile: float, tags: dict = None) -> float:
"""히스토그램의 백분위 값을 반환합니다."""
import numpy as np
key = self._make_key(name, tags)
values = self.histograms.get(key, [])
if not values:
return 0.0
return float(np.percentile(values, percentile))
def _make_key(self, name: str, tags: dict = None) -> str:
if tags:
tag_str = ",".join(k + "=" + str(v) for k, v in sorted(tags.items()))
return name + "{" + tag_str + "}"
return name
# 사용 예시
metrics = MetricsCollector()
# LLM 호출 시 메트릭 기록
def record_llm_call(model: str, latency_ms: float, tokens: int, success: bool):
tags = {"model": model}
metrics.increment("llm_calls_total", tags=tags)
metrics.record("llm_latency_ms", latency_ms, tags=tags)
metrics.record("llm_tokens_used", tokens, tags=tags)
if not success:
metrics.increment("llm_errors_total", tags=tags)LangSmith는 LLM 애플리케이션 특화 관찰 가능성 플랫폼으로, 트레이싱, 평가, 프롬프트 관리를 통합적으로 제공합니다.
import os
# LangSmith 설정
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-api-key"
os.environ["LANGCHAIN_PROJECT"] = "my-llm-app"
from langsmith import traceable
@traceable(name="qa_pipeline")
def answer_question(question: str) -> str:
"""LangSmith로 자동 트레이싱되는 QA 파이프라인입니다."""
context = retrieve_context(question)
prompt = build_prompt(question, context)
response = call_llm(prompt)
return response
@traceable(name="retrieve_context")
def retrieve_context(question: str) -> str:
"""검색 단계도 개별 스팬으로 추적됩니다."""
results = vector_store.search(question, top_k=5)
return format_context(results)LangSmith는 무료 티어에서도 월 5,000건의 트레이스를 제공합니다. 프로젝트 초기 단계에서 관찰 가능성을 빠르게 구축하기에 좋은 출발점입니다. 트래픽이 증가하면 OpenTelemetry 기반의 자체 파이프라인을 구축하는 것을 고려하세요.
alert_rules = [
{
"name": "높은 에러율",
"metric": "llm_errors_total / llm_calls_total",
"condition": "greater_than",
"threshold": 0.05,
"window_minutes": 5,
"severity": "critical",
"action": "PagerDuty 호출 + Slack 알림",
},
{
"name": "응답 지연 증가",
"metric": "llm_latency_ms_p95",
"condition": "greater_than",
"threshold": 5000,
"window_minutes": 10,
"severity": "warning",
"action": "Slack 알림",
},
{
"name": "비용 급증",
"metric": "hourly_cost_usd",
"condition": "greater_than",
"threshold_multiplier": 3.0, # 평소 대비 3배
"comparison": "same_hour_last_week",
"severity": "warning",
"action": "Slack 알림 + 이메일",
},
{
"name": "품질 점수 하락",
"metric": "quality_score_mean",
"condition": "less_than",
"threshold": 3.5,
"window_minutes": 60,
"severity": "warning",
"action": "Slack 알림",
},
]class AlertManager:
"""알림 중복과 피로를 관리합니다."""
def __init__(self):
self.active_alerts = {}
self.cooldown_minutes = 30
def should_fire(self, alert_name: str) -> bool:
"""알림을 발생시켜야 하는지 판단합니다."""
now = time.time()
last_fired = self.active_alerts.get(alert_name)
if last_fired is None:
return True
elapsed_minutes = (now - last_fired) / 60
return elapsed_minutes >= self.cooldown_minutes
def fire(self, alert_name: str, details: dict):
"""알림을 발생시킵니다."""
if self.should_fire(alert_name):
self.active_alerts[alert_name] = time.time()
send_alert(alert_name, details)
def resolve(self, alert_name: str):
"""알림을 해소합니다."""
if alert_name in self.active_alerts:
del self.active_alerts[alert_name]
send_resolution(alert_name)LLM 애플리케이션의 관찰 가능성은 로그, 메트릭, 트레이스의 세 기둥으로 구성됩니다. 구조화된 로깅으로 개별 호출을 추적하고, 분산 트레이싱으로 전체 요청 흐름을 파악하며, 집계 메트릭으로 시스템 건강 상태를 모니터링합니다.
민감 정보 마스킹, 비용 효율적인 저장 전략, 알림 피로 방지 등 프로덕션 운영에서 고려해야 할 실질적인 문제도 함께 설계해야 합니다.
다음 장에서는 이 관찰 가능성 인프라를 기반으로 시간에 따른 품질 변화를 감지하는 드리프트 감지와 품질 모니터링을 다룹니다.
이 글이 도움이 되셨나요?