멀티에이전트 시스템의 분산 추적, 구조화된 로깅, 메트릭 수집, 에이전트 행동 시각화, 그리고 프로덕션 디버깅 전략을 다룹니다.
전통적인 소프트웨어의 관측 가능성은 "입력 X에 대해 출력 Y를 반환했는가"를 확인하는 것으로 충분합니다. 하지만 에이전트 시스템은 근본적으로 다릅니다.
비결정적 실행 경로: 같은 입력에 대해 에이전트가 다른 도구를 호출하고, 다른 에이전트에게 위임할 수 있습니다. 실행 경로 자체가 비결정적이므로, "정상적인 경로"를 정의하기 어렵습니다.
자연어 중간 상태: 에이전트 간의 통신이 자연어로 이루어지므로, 구조화된 로그에서 추출할 정보를 파싱하기 어렵습니다.
다층 의사결정: 감독자의 라우팅 결정, 워커의 도구 선택, 핸드오프 판단 등 여러 층의 의사결정이 중첩됩니다. 문제의 근본 원인이 어느 층에 있는지 추적해야 합니다.
OpenTelemetry 기반의 분산 추적은 멀티에이전트 시스템의 실행 흐름을 시각화하는 가장 효과적인 방법입니다.
from opentelemetry import trace
from opentelemetry.trace import StatusCode
from functools import wraps
tracer = trace.get_tracer("agent-orchestration")
class TracedAgent:
"""OpenTelemetry 추적이 통합된 에이전트 래퍼"""
def __init__(self, agent: Agent):
self.agent = agent
async def run(self, task: str, context: dict = None):
with tracer.start_as_current_span(
f"agent.{self.agent.name}",
attributes={
"agent.name": self.agent.name,
"agent.model": self.agent.model,
"agent.task_length": len(task),
"agent.tools_count": len(self.agent.tools),
}
) as span:
try:
# LLM 호출 추적
result = await self._traced_llm_call(
task, context, span
)
span.set_status(StatusCode.OK)
span.set_attribute(
"agent.output_length",
len(str(result))
)
return result
except Exception as e:
span.set_status(StatusCode.ERROR, str(e))
span.record_exception(e)
raise
async def _traced_llm_call(
self, task: str, context: dict, parent_span
):
with tracer.start_as_current_span(
"llm.inference",
attributes={
"llm.model": self.agent.model,
"llm.input_tokens": estimate_tokens(task),
}
) as span:
result = await self.agent.run(task)
# 토큰 사용량 기록
if hasattr(result, "usage"):
span.set_attribute(
"llm.input_tokens", result.usage.input_tokens
)
span.set_attribute(
"llm.output_tokens", result.usage.output_tokens
)
return result
def trace_tool_call(func):
"""도구 호출 추적 데코레이터"""
@wraps(func)
async def wrapper(*args, **kwargs):
with tracer.start_as_current_span(
f"tool.{func.__name__}",
attributes={
"tool.name": func.__name__,
"tool.args": str(kwargs)[:500],
}
) as span:
try:
result = await func(*args, **kwargs)
span.set_attribute(
"tool.result_size",
len(str(result))
)
return result
except Exception as e:
span.set_status(StatusCode.ERROR, str(e))
raise
return wrapperclass TracedWorkflow:
"""워크플로우 수준의 분산 추적"""
def __init__(self, workflow_id: str, agents: list[TracedAgent]):
self.workflow_id = workflow_id
self.agents = agents
async def execute(self, initial_task: str) -> dict:
with tracer.start_as_current_span(
"workflow.execute",
attributes={
"workflow.id": self.workflow_id,
"workflow.agent_count": len(self.agents),
"workflow.task": initial_task[:200],
}
) as root_span:
state = {"task": initial_task}
for agent in self.agents:
# 핸드오프 추적
with tracer.start_as_current_span(
"workflow.step",
attributes={
"step.agent": agent.agent.name,
"step.input_keys": str(list(state.keys())),
}
):
result = await agent.run(
state["task"], state
)
state.update(result)
root_span.set_attribute(
"workflow.total_steps", len(self.agents)
)
return state추적 결과를 Jaeger나 Grafana Tempo에서 시각화하면 다음과 같은 트레이스를 확인할 수 있습니다.
workflow.execute (1250ms)
├── workflow.step: supervisor (350ms)
│ ├── llm.inference (280ms)
│ └── tool.classify_intent (45ms)
├── workflow.step: researcher (520ms)
│ ├── llm.inference (150ms)
│ ├── tool.web_search (250ms)
│ └── llm.inference (100ms)
└── workflow.step: writer (380ms)
├── llm.inference (340ms)
└── tool.format_output (25ms)
자연어 로그 대신 구조화된 JSON 로그를 사용하면 검색과 분석이 용이합니다.
import structlog
from datetime import datetime
logger = structlog.get_logger()
class AgentLogger:
def __init__(self, agent_name: str, workflow_id: str):
self.log = logger.bind(
agent=agent_name,
workflow_id=workflow_id
)
def decision(
self,
decision_type: str,
choice: str,
alternatives: list[str],
reasoning: str
):
"""에이전트 의사결정 로그"""
self.log.info(
"agent.decision",
decision_type=decision_type,
choice=choice,
alternatives=alternatives,
reasoning=reasoning[:500],
timestamp=datetime.now().isoformat()
)
def tool_call(
self,
tool_name: str,
args: dict,
result_summary: str,
duration_ms: int
):
"""도구 호출 로그"""
self.log.info(
"agent.tool_call",
tool=tool_name,
args={
k: str(v)[:200] for k, v in args.items()
},
result_summary=result_summary[:300],
duration_ms=duration_ms
)
def handoff(
self, target_agent: str, reason: str, context_summary: str
):
"""핸드오프 로그"""
self.log.info(
"agent.handoff",
target=target_agent,
reason=reason,
context_summary=context_summary[:300]
)
def error(self, error_type: str, message: str, recoverable: bool):
"""오류 로그"""
self.log.error(
"agent.error",
error_type=error_type,
message=message,
recoverable=recoverable
)
# 사용 예시
agent_log = AgentLogger("supervisor", "wf-001")
agent_log.decision(
decision_type="routing",
choice="billing-agent",
alternatives=["billing-agent", "shipping-agent", "technical-agent"],
reasoning="사용자가 환불을 요청했으므로 결제 전문 에이전트로 라우팅"
)에이전트의 의사결정 과정을 추적하는 것은 디버깅의 핵심입니다.
@dataclass
class DecisionRecord:
"""에이전트의 단일 의사결정 기록"""
agent_id: str
timestamp: datetime
input_summary: str
decision_type: str # routing, tool_selection, handoff, response
options_considered: list[dict]
chosen_option: str
confidence: float
reasoning: str
class DecisionTracker:
def __init__(self, storage):
self.storage = storage
self.current_decisions: list[DecisionRecord] = []
async def record(self, decision: DecisionRecord):
self.current_decisions.append(decision)
await self.storage.append(
f"decisions/{decision.agent_id}", decision
)
async def get_decision_chain(
self, workflow_id: str
) -> list[DecisionRecord]:
"""워크플로우의 전체 의사결정 체인 조회"""
return await self.storage.query(
workflow_id=workflow_id,
sort_by="timestamp"
)
async def find_anomalies(
self, workflow_id: str
) -> list[dict]:
"""비정상적 의사결정 패턴 탐지"""
chain = await self.get_decision_chain(workflow_id)
anomalies = []
for i, decision in enumerate(chain):
# 낮은 확신도 결정
if decision.confidence < 0.3:
anomalies.append({
"type": "low_confidence",
"decision": decision,
"message": (
f"{decision.agent_id}의 "
f"{decision.decision_type} 결정 "
f"확신도가 {decision.confidence:.2f}로 낮음"
)
})
# 반복적 핸드오프 (루프 의심)
if (
decision.decision_type == "handoff"
and i >= 2
and chain[i-2].decision_type == "handoff"
and chain[i-2].chosen_option == decision.agent_id
):
anomalies.append({
"type": "handoff_loop",
"decision": decision,
"message": (
f"핸드오프 루프 감지: "
f"{chain[i-2].agent_id} → "
f"{chain[i-1].agent_id} → "
f"{decision.agent_id}"
)
})
return anomaliesfrom prometheus_client import (
Counter, Histogram, Gauge, Summary
)
# 호출 메트릭
agent_calls_total = Counter(
"agent_calls_total",
"Total agent invocations",
["agent_name", "status"] # success, error
)
# 지연시간 메트릭
agent_latency = Histogram(
"agent_latency_seconds",
"Agent execution latency",
["agent_name"],
buckets=[0.5, 1, 2, 5, 10, 30, 60]
)
# 토큰 사용량
token_usage = Counter(
"agent_token_usage_total",
"Total tokens consumed",
["agent_name", "model", "direction"] # input, output
)
# 활성 에이전트 수
active_agents = Gauge(
"active_agents",
"Currently active agent instances",
["agent_type"]
)
# 핸드오프 메트릭
handoffs_total = Counter(
"agent_handoffs_total",
"Total handoff count",
["source_agent", "target_agent"]
)
# 도구 호출 메트릭
tool_calls = Histogram(
"agent_tool_call_seconds",
"Tool call duration",
["agent_name", "tool_name"],
buckets=[0.1, 0.5, 1, 2, 5, 10]
)
class MetricsCollector:
@staticmethod
def record_agent_call(
agent_name: str,
status: str,
duration: float,
input_tokens: int,
output_tokens: int,
model: str
):
agent_calls_total.labels(
agent_name=agent_name, status=status
).inc()
agent_latency.labels(
agent_name=agent_name
).observe(duration)
token_usage.labels(
agent_name=agent_name, model=model,
direction="input"
).inc(input_tokens)
token_usage.labels(
agent_name=agent_name, model=model,
direction="output"
).inc(output_tokens)에이전트 시스템의 SLO를 정의하고 모니터링합니다.
@dataclass
class AgentSLO:
name: str
target: float
window: str # "1h", "24h", "7d"
metric: str
threshold: float
# SLO 정의 예시
slos = [
AgentSLO(
name="에이전트 가용성",
target=0.995, # 99.5%
window="24h",
metric="success_rate",
threshold=0.995
),
AgentSLO(
name="응답 지연시간 P95",
target=0.95, # 95%의 요청이 5초 이내
window="1h",
metric="latency_p95",
threshold=5.0
),
AgentSLO(
name="핸드오프 정확도",
target=0.90, # 90%
window="24h",
metric="routing_accuracy",
threshold=0.90
)
]
class SLOMonitor:
def __init__(self, metrics_provider, alerter):
self.metrics = metrics_provider
self.alerter = alerter
async def check_slos(self, slos: list[AgentSLO]):
for slo in slos:
current = await self.metrics.query(
slo.metric, window=slo.window
)
if current < slo.threshold:
error_budget_remaining = (
(current - (1 - slo.target))
/ slo.target
)
await self.alerter.send({
"slo": slo.name,
"current": current,
"target": slo.target,
"error_budget_remaining": (
f"{error_budget_remaining:.1%}"
),
"severity": (
"critical"
if error_budget_remaining < 0
else "warning"
)
})이벤트 소싱과 결합하면 문제가 발생한 워크플로우를 정확히 재현할 수 있습니다.
class WorkflowReplayer:
"""워크플로우 실행을 재현하여 문제 분석"""
def __init__(self, event_store: EventStore):
self.events = event_store
async def replay(
self,
workflow_id: str,
stop_before: str | None = None
) -> dict:
"""워크플로우를 처음부터 재현"""
events = await self.events.get_events(workflow_id)
state = {}
timeline = []
for event in events:
if (
stop_before
and event.agent_id == stop_before
and event.event_type == EventType.AGENT_INVOKED
):
break
entry = {
"timestamp": event.timestamp.isoformat(),
"type": event.event_type.value,
"agent": event.agent_id,
"data_keys": list(event.data.keys()),
}
if event.event_type == EventType.AGENT_COMPLETED:
state.update(event.data.get("result", {}))
entry["state_after"] = list(state.keys())
elif event.event_type == EventType.AGENT_FAILED:
entry["error"] = event.data.get("error", "unknown")
timeline.append(entry)
return {
"workflow_id": workflow_id,
"total_events": len(timeline),
"final_state": state,
"timeline": timeline
}
async def compare_runs(
self, workflow_id_a: str, workflow_id_b: str
) -> dict:
"""두 실행의 차이점 분석"""
timeline_a = await self.replay(workflow_id_a)
timeline_b = await self.replay(workflow_id_b)
differences = []
max_len = max(
len(timeline_a["timeline"]),
len(timeline_b["timeline"])
)
for i in range(max_len):
a = (
timeline_a["timeline"][i]
if i < len(timeline_a["timeline"]) else None
)
b = (
timeline_b["timeline"][i]
if i < len(timeline_b["timeline"]) else None
)
if a and b and a["type"] != b["type"]:
differences.append({
"step": i,
"run_a": a,
"run_b": b,
"divergence": "다른 이벤트 타입"
})
elif a and b and a.get("agent") != b.get("agent"):
differences.append({
"step": i,
"run_a": a,
"run_b": b,
"divergence": "다른 에이전트 선택"
})
return {
"divergence_count": len(differences),
"first_divergence": (
differences[0] if differences else None
),
"all_differences": differences
}프로덕션 디버깅에서 가장 유용한 질문은 "이 워크플로우와 정상적으로 완료된 유사한 워크플로우의 차이점은 무엇인가?"입니다. compare_runs 기능을 활용하면 분기 지점을 빠르게 찾을 수 있습니다.
복잡한 멀티에이전트 상호작용을 시각화하면 패턴과 이상을 직관적으로 파악할 수 있습니다.
class WorkflowVisualizer:
"""워크플로우를 Mermaid 다이어그램으로 시각화"""
async def to_sequence_diagram(
self, workflow_id: str, events: list[AgentEvent]
) -> str:
lines = ["sequenceDiagram"]
participants = set()
for event in events:
if event.agent_id:
participants.add(event.agent_id)
for p in sorted(participants):
lines.append(f" participant {p}")
for event in events:
if event.event_type == EventType.HANDOFF_INITIATED:
source = event.agent_id
target = event.data.get("target")
reason = event.data.get("reason", "")[:50]
lines.append(
f" {source}->>+{target}: 핸드오프: {reason}"
)
elif event.event_type == EventType.TOOL_CALLED:
agent = event.agent_id
tool = event.data.get("tool_name", "unknown")
lines.append(
f" {agent}->>+{tool}: 도구 호출"
)
elif event.event_type == EventType.TOOL_RETURNED:
agent = event.agent_id
tool = event.data.get("tool_name", "unknown")
lines.append(
f" {tool}-->>-{agent}: 결과 반환"
)
elif event.event_type == EventType.AGENT_FAILED:
agent = event.agent_id
error = event.data.get("error", "")[:30]
lines.append(
f" Note over {agent}: 실패: {error}"
)
return "\n".join(lines)다음 장에서는 멀티에이전트 시스템의 보안과 거버넌스를 다룹니다. 에이전트 인증, 권한 관리, 데이터 보호, 규제 준수를 살펴봅니다.
이 글이 도움이 되셨나요?
수십에서 수백 개의 에이전트를 운영하는 플릿 관리 전략, 자동 스케일링, 로드 밸런싱, 비용 최적화를 다룹니다.
멀티에이전트 시스템의 인증과 인가, 최소 권한 원칙, 데이터 보호, 감사 추적, 규제 준수까지 프로덕션 보안과 거버넌스를 다룹니다.
고객 서비스 자동화 시스템을 처음부터 설계하고 구축하며, 감독자-워커 아키텍처, A2A 통신, 컨트롤 플레인, 관측 가능성을 통합합니다.