본문으로 건너뛰기
Kreath Archive
TechProjectsBooksAbout
TechProjectsBooksAbout
© 2026 Kreath
홈TechProjectsBooksAbout
//
  1. 홈
  2. 테크
  3. 9장: 관측 가능성과 디버깅
2026년 4월 6일·AI / ML·

9장: 관측 가능성과 디버깅

멀티에이전트 시스템의 분산 추적, 구조화된 로깅, 메트릭 수집, 에이전트 행동 시각화, 그리고 프로덕션 디버깅 전략을 다룹니다.

13분1,457자5개 섹션
ai-에이전트관측-가능성디버깅분산-추적
공유
agent-orchestration9 / 11
1234567891011
이전8장: 에이전트 플릿 관리와 스케일링다음10장: 보안과 거버넌스

에이전트 시스템에서 관측 가능성이 어려운 이유

전통적인 소프트웨어의 관측 가능성은 "입력 X에 대해 출력 Y를 반환했는가"를 확인하는 것으로 충분합니다. 하지만 에이전트 시스템은 근본적으로 다릅니다.

비결정적 실행 경로: 같은 입력에 대해 에이전트가 다른 도구를 호출하고, 다른 에이전트에게 위임할 수 있습니다. 실행 경로 자체가 비결정적이므로, "정상적인 경로"를 정의하기 어렵습니다.

자연어 중간 상태: 에이전트 간의 통신이 자연어로 이루어지므로, 구조화된 로그에서 추출할 정보를 파싱하기 어렵습니다.

다층 의사결정: 감독자의 라우팅 결정, 워커의 도구 선택, 핸드오프 판단 등 여러 층의 의사결정이 중첩됩니다. 문제의 근본 원인이 어느 층에 있는지 추적해야 합니다.

분산 추적(Distributed Tracing)

OpenTelemetry 기반의 분산 추적은 멀티에이전트 시스템의 실행 흐름을 시각화하는 가장 효과적인 방법입니다.

에이전트 추적 계측

python
from opentelemetry import trace
from opentelemetry.trace import StatusCode
from functools import wraps
 
tracer = trace.get_tracer("agent-orchestration")
 
class TracedAgent:
    """OpenTelemetry 추적이 통합된 에이전트 래퍼"""
 
    def __init__(self, agent: Agent):
        self.agent = agent
 
    async def run(self, task: str, context: dict = None):
        with tracer.start_as_current_span(
            f"agent.{self.agent.name}",
            attributes={
                "agent.name": self.agent.name,
                "agent.model": self.agent.model,
                "agent.task_length": len(task),
                "agent.tools_count": len(self.agent.tools),
            }
        ) as span:
            try:
                # LLM 호출 추적
                result = await self._traced_llm_call(
                    task, context, span
                )
                span.set_status(StatusCode.OK)
                span.set_attribute(
                    "agent.output_length",
                    len(str(result))
                )
                return result
            except Exception as e:
                span.set_status(StatusCode.ERROR, str(e))
                span.record_exception(e)
                raise
 
    async def _traced_llm_call(
        self, task: str, context: dict, parent_span
    ):
        with tracer.start_as_current_span(
            "llm.inference",
            attributes={
                "llm.model": self.agent.model,
                "llm.input_tokens": estimate_tokens(task),
            }
        ) as span:
            result = await self.agent.run(task)
 
            # 토큰 사용량 기록
            if hasattr(result, "usage"):
                span.set_attribute(
                    "llm.input_tokens", result.usage.input_tokens
                )
                span.set_attribute(
                    "llm.output_tokens", result.usage.output_tokens
                )
 
            return result
 
def trace_tool_call(func):
    """도구 호출 추적 데코레이터"""
    @wraps(func)
    async def wrapper(*args, **kwargs):
        with tracer.start_as_current_span(
            f"tool.{func.__name__}",
            attributes={
                "tool.name": func.__name__,
                "tool.args": str(kwargs)[:500],
            }
        ) as span:
            try:
                result = await func(*args, **kwargs)
                span.set_attribute(
                    "tool.result_size",
                    len(str(result))
                )
                return result
            except Exception as e:
                span.set_status(StatusCode.ERROR, str(e))
                raise
    return wrapper

워크플로우 전체 추적

python
class TracedWorkflow:
    """워크플로우 수준의 분산 추적"""
 
    def __init__(self, workflow_id: str, agents: list[TracedAgent]):
        self.workflow_id = workflow_id
        self.agents = agents
 
    async def execute(self, initial_task: str) -> dict:
        with tracer.start_as_current_span(
            "workflow.execute",
            attributes={
                "workflow.id": self.workflow_id,
                "workflow.agent_count": len(self.agents),
                "workflow.task": initial_task[:200],
            }
        ) as root_span:
            state = {"task": initial_task}
 
            for agent in self.agents:
                # 핸드오프 추적
                with tracer.start_as_current_span(
                    "workflow.step",
                    attributes={
                        "step.agent": agent.agent.name,
                        "step.input_keys": str(list(state.keys())),
                    }
                ):
                    result = await agent.run(
                        state["task"], state
                    )
                    state.update(result)
 
            root_span.set_attribute(
                "workflow.total_steps", len(self.agents)
            )
            return state

추적 결과를 Jaeger나 Grafana Tempo에서 시각화하면 다음과 같은 트레이스를 확인할 수 있습니다.

workflow.execute (1250ms)
├── workflow.step: supervisor (350ms)
│   ├── llm.inference (280ms)
│   └── tool.classify_intent (45ms)
├── workflow.step: researcher (520ms)
│   ├── llm.inference (150ms)
│   ├── tool.web_search (250ms)
│   └── llm.inference (100ms)
└── workflow.step: writer (380ms)
    ├── llm.inference (340ms)
    └── tool.format_output (25ms)

구조화된 로깅

에이전트 로그 스키마

자연어 로그 대신 구조화된 JSON 로그를 사용하면 검색과 분석이 용이합니다.

python
import structlog
from datetime import datetime
 
logger = structlog.get_logger()
 
class AgentLogger:
    def __init__(self, agent_name: str, workflow_id: str):
        self.log = logger.bind(
            agent=agent_name,
            workflow_id=workflow_id
        )
 
    def decision(
        self,
        decision_type: str,
        choice: str,
        alternatives: list[str],
        reasoning: str
    ):
        """에이전트 의사결정 로그"""
        self.log.info(
            "agent.decision",
            decision_type=decision_type,
            choice=choice,
            alternatives=alternatives,
            reasoning=reasoning[:500],
            timestamp=datetime.now().isoformat()
        )
 
    def tool_call(
        self,
        tool_name: str,
        args: dict,
        result_summary: str,
        duration_ms: int
    ):
        """도구 호출 로그"""
        self.log.info(
            "agent.tool_call",
            tool=tool_name,
            args={
                k: str(v)[:200] for k, v in args.items()
            },
            result_summary=result_summary[:300],
            duration_ms=duration_ms
        )
 
    def handoff(
        self, target_agent: str, reason: str, context_summary: str
    ):
        """핸드오프 로그"""
        self.log.info(
            "agent.handoff",
            target=target_agent,
            reason=reason,
            context_summary=context_summary[:300]
        )
 
    def error(self, error_type: str, message: str, recoverable: bool):
        """오류 로그"""
        self.log.error(
            "agent.error",
            error_type=error_type,
            message=message,
            recoverable=recoverable
        )
 
# 사용 예시
agent_log = AgentLogger("supervisor", "wf-001")
agent_log.decision(
    decision_type="routing",
    choice="billing-agent",
    alternatives=["billing-agent", "shipping-agent", "technical-agent"],
    reasoning="사용자가 환불을 요청했으므로 결제 전문 에이전트로 라우팅"
)

의사결정 추적(Decision Trace)

에이전트의 의사결정 과정을 추적하는 것은 디버깅의 핵심입니다.

python
@dataclass
class DecisionRecord:
    """에이전트의 단일 의사결정 기록"""
    agent_id: str
    timestamp: datetime
    input_summary: str
    decision_type: str  # routing, tool_selection, handoff, response
    options_considered: list[dict]
    chosen_option: str
    confidence: float
    reasoning: str
 
class DecisionTracker:
    def __init__(self, storage):
        self.storage = storage
        self.current_decisions: list[DecisionRecord] = []
 
    async def record(self, decision: DecisionRecord):
        self.current_decisions.append(decision)
        await self.storage.append(
            f"decisions/{decision.agent_id}", decision
        )
 
    async def get_decision_chain(
        self, workflow_id: str
    ) -> list[DecisionRecord]:
        """워크플로우의 전체 의사결정 체인 조회"""
        return await self.storage.query(
            workflow_id=workflow_id,
            sort_by="timestamp"
        )
 
    async def find_anomalies(
        self, workflow_id: str
    ) -> list[dict]:
        """비정상적 의사결정 패턴 탐지"""
        chain = await self.get_decision_chain(workflow_id)
        anomalies = []
 
        for i, decision in enumerate(chain):
            # 낮은 확신도 결정
            if decision.confidence < 0.3:
                anomalies.append({
                    "type": "low_confidence",
                    "decision": decision,
                    "message": (
                        f"{decision.agent_id}의 "
                        f"{decision.decision_type} 결정 "
                        f"확신도가 {decision.confidence:.2f}로 낮음"
                    )
                })
 
            # 반복적 핸드오프 (루프 의심)
            if (
                decision.decision_type == "handoff"
                and i >= 2
                and chain[i-2].decision_type == "handoff"
                and chain[i-2].chosen_option == decision.agent_id
            ):
                anomalies.append({
                    "type": "handoff_loop",
                    "decision": decision,
                    "message": (
                        f"핸드오프 루프 감지: "
                        f"{chain[i-2].agent_id} → "
                        f"{chain[i-1].agent_id} → "
                        f"{decision.agent_id}"
                    )
                })
 
        return anomalies

메트릭 수집

핵심 에이전트 메트릭

python
from prometheus_client import (
    Counter, Histogram, Gauge, Summary
)
 
# 호출 메트릭
agent_calls_total = Counter(
    "agent_calls_total",
    "Total agent invocations",
    ["agent_name", "status"]  # success, error
)
 
# 지연시간 메트릭
agent_latency = Histogram(
    "agent_latency_seconds",
    "Agent execution latency",
    ["agent_name"],
    buckets=[0.5, 1, 2, 5, 10, 30, 60]
)
 
# 토큰 사용량
token_usage = Counter(
    "agent_token_usage_total",
    "Total tokens consumed",
    ["agent_name", "model", "direction"]  # input, output
)
 
# 활성 에이전트 수
active_agents = Gauge(
    "active_agents",
    "Currently active agent instances",
    ["agent_type"]
)
 
# 핸드오프 메트릭
handoffs_total = Counter(
    "agent_handoffs_total",
    "Total handoff count",
    ["source_agent", "target_agent"]
)
 
# 도구 호출 메트릭
tool_calls = Histogram(
    "agent_tool_call_seconds",
    "Tool call duration",
    ["agent_name", "tool_name"],
    buckets=[0.1, 0.5, 1, 2, 5, 10]
)
 
class MetricsCollector:
    @staticmethod
    def record_agent_call(
        agent_name: str,
        status: str,
        duration: float,
        input_tokens: int,
        output_tokens: int,
        model: str
    ):
        agent_calls_total.labels(
            agent_name=agent_name, status=status
        ).inc()
 
        agent_latency.labels(
            agent_name=agent_name
        ).observe(duration)
 
        token_usage.labels(
            agent_name=agent_name, model=model,
            direction="input"
        ).inc(input_tokens)
 
        token_usage.labels(
            agent_name=agent_name, model=model,
            direction="output"
        ).inc(output_tokens)

SLO(Service Level Objectives)

에이전트 시스템의 SLO를 정의하고 모니터링합니다.

python
@dataclass
class AgentSLO:
    name: str
    target: float
    window: str  # "1h", "24h", "7d"
    metric: str
    threshold: float
 
# SLO 정의 예시
slos = [
    AgentSLO(
        name="에이전트 가용성",
        target=0.995,  # 99.5%
        window="24h",
        metric="success_rate",
        threshold=0.995
    ),
    AgentSLO(
        name="응답 지연시간 P95",
        target=0.95,  # 95%의 요청이 5초 이내
        window="1h",
        metric="latency_p95",
        threshold=5.0
    ),
    AgentSLO(
        name="핸드오프 정확도",
        target=0.90,  # 90%
        window="24h",
        metric="routing_accuracy",
        threshold=0.90
    )
]
 
class SLOMonitor:
    def __init__(self, metrics_provider, alerter):
        self.metrics = metrics_provider
        self.alerter = alerter
 
    async def check_slos(self, slos: list[AgentSLO]):
        for slo in slos:
            current = await self.metrics.query(
                slo.metric, window=slo.window
            )
            if current < slo.threshold:
                error_budget_remaining = (
                    (current - (1 - slo.target))
                    / slo.target
                )
                await self.alerter.send({
                    "slo": slo.name,
                    "current": current,
                    "target": slo.target,
                    "error_budget_remaining": (
                        f"{error_budget_remaining:.1%}"
                    ),
                    "severity": (
                        "critical"
                        if error_budget_remaining < 0
                        else "warning"
                    )
                })

프로덕션 디버깅 전략

리플레이 디버깅(Replay Debugging)

이벤트 소싱과 결합하면 문제가 발생한 워크플로우를 정확히 재현할 수 있습니다.

python
class WorkflowReplayer:
    """워크플로우 실행을 재현하여 문제 분석"""
 
    def __init__(self, event_store: EventStore):
        self.events = event_store
 
    async def replay(
        self,
        workflow_id: str,
        stop_before: str | None = None
    ) -> dict:
        """워크플로우를 처음부터 재현"""
        events = await self.events.get_events(workflow_id)
        state = {}
        timeline = []
 
        for event in events:
            if (
                stop_before
                and event.agent_id == stop_before
                and event.event_type == EventType.AGENT_INVOKED
            ):
                break
 
            entry = {
                "timestamp": event.timestamp.isoformat(),
                "type": event.event_type.value,
                "agent": event.agent_id,
                "data_keys": list(event.data.keys()),
            }
 
            if event.event_type == EventType.AGENT_COMPLETED:
                state.update(event.data.get("result", {}))
                entry["state_after"] = list(state.keys())
            elif event.event_type == EventType.AGENT_FAILED:
                entry["error"] = event.data.get("error", "unknown")
 
            timeline.append(entry)
 
        return {
            "workflow_id": workflow_id,
            "total_events": len(timeline),
            "final_state": state,
            "timeline": timeline
        }
 
    async def compare_runs(
        self, workflow_id_a: str, workflow_id_b: str
    ) -> dict:
        """두 실행의 차이점 분석"""
        timeline_a = await self.replay(workflow_id_a)
        timeline_b = await self.replay(workflow_id_b)
 
        differences = []
        max_len = max(
            len(timeline_a["timeline"]),
            len(timeline_b["timeline"])
        )
 
        for i in range(max_len):
            a = (
                timeline_a["timeline"][i]
                if i < len(timeline_a["timeline"]) else None
            )
            b = (
                timeline_b["timeline"][i]
                if i < len(timeline_b["timeline"]) else None
            )
 
            if a and b and a["type"] != b["type"]:
                differences.append({
                    "step": i,
                    "run_a": a,
                    "run_b": b,
                    "divergence": "다른 이벤트 타입"
                })
            elif a and b and a.get("agent") != b.get("agent"):
                differences.append({
                    "step": i,
                    "run_a": a,
                    "run_b": b,
                    "divergence": "다른 에이전트 선택"
                })
 
        return {
            "divergence_count": len(differences),
            "first_divergence": (
                differences[0] if differences else None
            ),
            "all_differences": differences
        }
Tip

프로덕션 디버깅에서 가장 유용한 질문은 "이 워크플로우와 정상적으로 완료된 유사한 워크플로우의 차이점은 무엇인가?"입니다. compare_runs 기능을 활용하면 분기 지점을 빠르게 찾을 수 있습니다.

에이전트 행동 시각화

복잡한 멀티에이전트 상호작용을 시각화하면 패턴과 이상을 직관적으로 파악할 수 있습니다.

python
class WorkflowVisualizer:
    """워크플로우를 Mermaid 다이어그램으로 시각화"""
 
    async def to_sequence_diagram(
        self, workflow_id: str, events: list[AgentEvent]
    ) -> str:
        lines = ["sequenceDiagram"]
        participants = set()
 
        for event in events:
            if event.agent_id:
                participants.add(event.agent_id)
 
        for p in sorted(participants):
            lines.append(f"    participant {p}")
 
        for event in events:
            if event.event_type == EventType.HANDOFF_INITIATED:
                source = event.agent_id
                target = event.data.get("target")
                reason = event.data.get("reason", "")[:50]
                lines.append(
                    f"    {source}->>+{target}: 핸드오프: {reason}"
                )
            elif event.event_type == EventType.TOOL_CALLED:
                agent = event.agent_id
                tool = event.data.get("tool_name", "unknown")
                lines.append(
                    f"    {agent}->>+{tool}: 도구 호출"
                )
            elif event.event_type == EventType.TOOL_RETURNED:
                agent = event.agent_id
                tool = event.data.get("tool_name", "unknown")
                lines.append(
                    f"    {tool}-->>-{agent}: 결과 반환"
                )
            elif event.event_type == EventType.AGENT_FAILED:
                agent = event.agent_id
                error = event.data.get("error", "")[:30]
                lines.append(
                    f"    Note over {agent}: 실패: {error}"
                )
 
        return "\n".join(lines)

다음 장에서는 멀티에이전트 시스템의 보안과 거버넌스를 다룹니다. 에이전트 인증, 권한 관리, 데이터 보호, 규제 준수를 살펴봅니다.

이 글이 도움이 되셨나요?

관련 글

AI / ML

8장: 에이전트 플릿 관리와 스케일링

수십에서 수백 개의 에이전트를 운영하는 플릿 관리 전략, 자동 스케일링, 로드 밸런싱, 비용 최적화를 다룹니다.

2026년 4월 5일·13분
AI / ML

10장: 보안과 거버넌스

멀티에이전트 시스템의 인증과 인가, 최소 권한 원칙, 데이터 보호, 감사 추적, 규제 준수까지 프로덕션 보안과 거버넌스를 다룹니다.

2026년 4월 8일·16분
AI / ML

11장: 실전 프로젝트 — 멀티에이전트 오케스트레이션 시스템 구축

고객 서비스 자동화 시스템을 처음부터 설계하고 구축하며, 감독자-워커 아키텍처, A2A 통신, 컨트롤 플레인, 관측 가능성을 통합합니다.

2026년 4월 9일·16분
이전 글8장: 에이전트 플릿 관리와 스케일링
다음 글10장: 보안과 거버넌스

댓글

목차

약 13분 남음
  • 에이전트 시스템에서 관측 가능성이 어려운 이유
  • 분산 추적(Distributed Tracing)
    • 에이전트 추적 계측
    • 워크플로우 전체 추적
  • 구조화된 로깅
    • 에이전트 로그 스키마
    • 의사결정 추적(Decision Trace)
  • 메트릭 수집
    • 핵심 에이전트 메트릭
    • SLO(Service Level Objectives)
  • 프로덕션 디버깅 전략
    • 리플레이 디버깅(Replay Debugging)
    • 에이전트 행동 시각화