고객 서비스 자동화 시스템을 처음부터 설계하고 구축하며, 감독자-워커 아키텍처, A2A 통신, 컨트롤 플레인, 관측 가능성을 통합합니다.
이 장에서는 시리즈 전체에서 다룬 개념을 종합하여 멀티에이전트 고객 서비스 시스템을 구축합니다. 이 시스템은 고객의 다양한 문의를 자동으로 분류하고, 전문 에이전트에게 위임하며, 복잡한 문의는 여러 에이전트가 협업하여 해결합니다.
- 프레임워크: LangGraph (감독자-워커 아키텍처)
- 통신: A2A 프로토콜 (외부 에이전트 연동)
- 추적: OpenTelemetry + Jaeger
- 메트릭: Prometheus + Grafana
- 상태 저장: PostgreSQL (체크포인팅)
- API: FastAPI
agent-cs-system/
├── src/
│ ├── agents/ # 에이전트 정의
│ │ ├── supervisor.py
│ │ ├── billing.py
│ │ ├── shipping.py
│ │ ├── technical.py
│ │ └── common.py
│ ├── orchestration/ # 오케스트레이션 엔진
│ │ ├── engine.py
│ │ ├── router.py
│ │ └── state.py
│ ├── control_plane/ # 컨트롤 플레인
│ │ ├── registry.py
│ │ ├── config.py
│ │ └── policy.py
│ ├── observability/ # 관측 가능성
│ │ ├── tracing.py
│ │ ├── metrics.py
│ │ └── logging.py
│ ├── security/ # 보안
│ │ ├── auth.py
│ │ ├── pii_filter.py
│ │ └── audit.py
│ └── api/ # API 엔드포인트
│ └── routes.py
├── tests/
├── config/
└── docker-compose.yml
from pydantic import BaseModel
from enum import Enum
from typing import TypedDict
class CustomerIntent(str, Enum):
BILLING = "billing"
SHIPPING = "shipping"
TECHNICAL = "technical"
GENERAL = "general"
class ConversationState(TypedDict):
"""전체 대화 상태"""
messages: list[dict]
customer_id: str | None
intent: CustomerIntent | None
current_agent: str
handoff_count: int
resolved: bool
resolution_summary: str | None
metadata: dict
class IntentClassification(BaseModel):
intent: CustomerIntent
confidence: float
reasoning: str
class AgentResponse(BaseModel):
message: str
resolved: bool
needs_handoff: bool
handoff_target: str | None = None
handoff_reason: str | None = Nonefrom langgraph.prebuilt import create_react_agent
from src.observability.tracing import trace_tool_call
@trace_tool_call
async def query_order(customer_id: str, order_id: str) -> dict:
"""주문 내역 조회"""
# 실제로는 DB 또는 API 호출
return {
"order_id": order_id,
"status": "delivered",
"amount": 45000,
"payment_method": "card",
"date": "2026-04-01"
}
@trace_tool_call
async def process_refund(
order_id: str, amount: int, reason: str
) -> dict:
"""환불 처리"""
return {
"refund_id": f"RF-{order_id}",
"amount": amount,
"status": "processed",
"estimated_days": 3
}
@trace_tool_call
async def check_payment_status(payment_id: str) -> dict:
"""결제 상태 확인"""
return {
"payment_id": payment_id,
"status": "completed",
"method": "card_ending_1234"
}
billing_agent = create_react_agent(
model="claude-sonnet-4-6",
tools=[query_order, process_refund, check_payment_status],
name="billing",
prompt="""결제와 환불 전문 에이전트입니다.
역할:
- 주문 내역 조회 및 결제 상태 확인
- 환불 처리 (정책에 따라)
- 결제 관련 문의 응대
규칙:
1. 환불은 주문일로부터 30일 이내만 가능합니다.
2. 부분 환불은 사유 확인 후 처리합니다.
3. 배송 관련 문의가 포함되면 shipping 에이전트로 핸드오프합니다.
4. 기술적 결제 오류는 technical 에이전트로 핸드오프합니다.
응답 톤: 전문적이고 정중하게 응대합니다."""
)@trace_tool_call
async def track_package(tracking_number: str) -> dict:
"""배송 추적"""
return {
"tracking_number": tracking_number,
"status": "in_transit",
"current_location": "서울 송파 물류센터",
"estimated_delivery": "2026-04-10",
"history": [
{"date": "2026-04-07", "event": "출고"},
{"date": "2026-04-08", "event": "집하"},
{"date": "2026-04-09", "event": "배송 중"}
]
}
@trace_tool_call
async def update_delivery_address(
order_id: str, new_address: str
) -> dict:
"""배송지 변경"""
return {"status": "updated", "new_address": new_address}
shipping_agent = create_react_agent(
model="claude-sonnet-4-6",
tools=[track_package, update_delivery_address],
name="shipping",
prompt="""배송 추적과 물류 관리 전문 에이전트입니다.
역할:
- 배송 현황 실시간 추적
- 배송지 변경 처리
- 배송 지연 사유 안내
규칙:
1. 배송지 변경은 '배송 중' 상태 이전까지만 가능합니다.
2. 배송 사고로 인한 보상은 billing 에이전트로 핸드오프합니다.
3. 예상 배송일을 정확하게 안내합니다."""
)@trace_tool_call
async def check_system_status(service: str) -> dict:
"""시스템 상태 확인"""
return {
"service": service,
"status": "operational",
"uptime": "99.95%",
"last_incident": "2026-03-15"
}
@trace_tool_call
async def run_diagnostics(
customer_id: str, issue_type: str
) -> dict:
"""진단 실행"""
return {
"diagnosis": "cache_expired",
"suggested_fix": "캐시 초기화 후 재접속",
"auto_fixable": True
}
technical_agent = create_react_agent(
model="claude-sonnet-4-6",
tools=[check_system_status, run_diagnostics],
name="technical",
prompt="""기술 지원 전문 에이전트입니다.
역할:
- 시스템 상태 확인 및 장애 안내
- 자동 진단 및 문제 해결
- 기술적 가이드 제공
규칙:
1. 먼저 시스템 상태를 확인하여 전체 장애 여부를 판단합니다.
2. 자동 수정이 가능한 문제는 고객 동의 후 실행합니다.
3. 결제 관련 기술 문제는 billing 에이전트로 핸드오프합니다."""
)from langgraph_supervisor import create_supervisor
def create_cs_supervisor():
"""고객 서비스 감독자 생성"""
supervisor = create_supervisor(
agents=[billing_agent, shipping_agent, technical_agent],
model="claude-opus-4-6", # 감독자에는 강력한 모델
prompt="""고객 서비스 팀의 감독자입니다.
당신의 역할:
1. 고객 문의를 분석하여 적절한 전문 에이전트에게 라우팅
2. 전문 에이전트의 응답 품질을 모니터링
3. 복합적인 문의는 여러 에이전트를 순차적으로 활용
4. 모든 문의가 해결될 때까지 관리
라우팅 기준:
- 결제, 환불, 청구서 → billing
- 배송, 택배, 추적 → shipping
- 오류, 버그, 접속 문제 → technical
- 여러 영역에 걸친 문의 → 주요 영역의 에이전트부터 시작
규칙:
1. 핸드오프 횟수가 5회를 초과하면 문의를 요약하여 직접 응답
2. 전문 에이전트가 해결하지 못하면 에스컬레이션 안내
3. 항상 고객에게 현재 상황을 알려주세요"""
)
return supervisor.compile(
checkpointer=postgres_checkpointer
)from langgraph.graph import StateGraph, END
def build_cs_workflow() -> StateGraph:
"""고객 서비스 워크플로우 그래프"""
workflow = StateGraph(ConversationState)
# 노드 추가
workflow.add_node("classify", classify_intent)
workflow.add_node("route", route_to_agent)
workflow.add_node("billing", execute_billing)
workflow.add_node("shipping", execute_shipping)
workflow.add_node("technical", execute_technical)
workflow.add_node("quality_check", check_response_quality)
# 엣지 정의
workflow.set_entry_point("classify")
workflow.add_edge("classify", "route")
workflow.add_conditional_edges(
"route",
lambda state: state["intent"].value,
{
"billing": "billing",
"shipping": "shipping",
"technical": "technical",
"general": "quality_check" # 일반 문의는 직접 응답
}
)
# 전문 에이전트 실행 후 품질 검사
workflow.add_edge("billing", "quality_check")
workflow.add_edge("shipping", "quality_check")
workflow.add_edge("technical", "quality_check")
# 품질 검사 결과에 따라 종료 또는 재라우팅
workflow.add_conditional_edges(
"quality_check",
lambda state: "end" if state["resolved"] else "route",
{"end": END, "route": "route"}
)
return workflow
async def classify_intent(state: ConversationState) -> dict:
"""의도 분류"""
last_message = state["messages"][-1]["content"]
classification = await intent_classifier.run(last_message)
agent_log.decision(
decision_type="classification",
choice=classification.intent.value,
alternatives=[e.value for e in CustomerIntent],
reasoning=classification.reasoning
)
return {
"intent": classification.intent,
"metadata": {
**state.get("metadata", {}),
"classification_confidence": classification.confidence
}
}
async def check_response_quality(
state: ConversationState
) -> dict:
"""응답 품질 검사"""
if state.get("resolved"):
return state
# 핸드오프 횟수 체크
if state.get("handoff_count", 0) >= 5:
return {
"resolved": True,
"resolution_summary": (
"여러 전문 에이전트를 통해 처리를 시도했으나 "
"완전한 해결이 어려워 담당자 연결을 안내했습니다."
)
}
return statefrom opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
OTLPSpanExporter
)
def setup_tracing():
"""OpenTelemetry 추적 설정"""
provider = TracerProvider()
processor = BatchSpanProcessor(
OTLPSpanExporter(endpoint="http://jaeger:4317")
)
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
tracer = trace.get_tracer("cs-agent-system")from prometheus_client import Counter, Histogram, Gauge
# 핵심 메트릭 정의
conversations_total = Counter(
"cs_conversations_total",
"Total conversations",
["intent", "resolved"]
)
resolution_time = Histogram(
"cs_resolution_seconds",
"Time to resolve customer inquiry",
["intent"],
buckets=[5, 10, 30, 60, 120, 300]
)
active_conversations = Gauge(
"cs_active_conversations",
"Currently active conversations"
)
handoff_count = Counter(
"cs_handoffs_total",
"Total handoffs between agents",
["source", "target"]
)
cost_per_conversation = Histogram(
"cs_cost_per_conversation_usd",
"Cost per conversation in USD",
["intent"],
buckets=[0.01, 0.05, 0.1, 0.5, 1.0]
)from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
app = FastAPI(title="Customer Service Agent System")
class ConversationRequest(BaseModel):
customer_id: str
message: str
class ConversationResponse(BaseModel):
conversation_id: str
response: str
resolved: bool
agent_used: str
@app.post("/conversations", response_model=ConversationResponse)
async def start_conversation(request: ConversationRequest):
"""새 대화 시작"""
with tracer.start_as_current_span("api.start_conversation"):
conversation_id = generate_id()
initial_state = ConversationState(
messages=[{
"role": "user",
"content": request.message
}],
customer_id=request.customer_id,
intent=None,
current_agent="supervisor",
handoff_count=0,
resolved=False,
resolution_summary=None,
metadata={}
)
# PII 필터링
filtered_state = pii_filter.filter_message(
initial_state, "internal"
)
# 워크플로우 실행
config = {
"configurable": {"thread_id": conversation_id}
}
result = await workflow.ainvoke(filtered_state, config)
# 메트릭 기록
conversations_total.labels(
intent=result.get("intent", "unknown"),
resolved=str(result.get("resolved", False))
).inc()
# 감사 로그
await audit_logger.log(AuditEntry(
timestamp=datetime.now(),
agent_id="system",
action="conversation_started",
resource=conversation_id,
details={
"customer_id": request.customer_id,
"intent": str(result.get("intent"))
},
outcome="success"
))
return ConversationResponse(
conversation_id=conversation_id,
response=result["messages"][-1]["content"],
resolved=result.get("resolved", False),
agent_used=result.get("current_agent", "supervisor")
)
@app.post("/conversations/{id}/messages")
async def continue_conversation(
id: str, request: ConversationRequest
):
"""대화 계속"""
config = {"configurable": {"thread_id": id}}
# 체크포인트에서 이전 상태 복원
state_update = {
"messages": [{
"role": "user",
"content": request.message
}]
}
result = await workflow.ainvoke(state_update, config)
return ConversationResponse(
conversation_id=id,
response=result["messages"][-1]["content"],
resolved=result.get("resolved", False),
agent_used=result.get("current_agent", "supervisor")
)
@app.get("/dashboard")
async def get_dashboard():
"""운영 대시보드 데이터"""
return await fleet_dashboard.get_overview()import pytest
@pytest.mark.asyncio
async def test_billing_routing():
"""결제 문의가 billing 에이전트로 라우팅되는지 확인"""
state = ConversationState(
messages=[{
"role": "user",
"content": "지난주 주문한 상품을 환불받고 싶습니다"
}],
customer_id="test-001",
intent=None,
current_agent="supervisor",
handoff_count=0,
resolved=False,
resolution_summary=None,
metadata={}
)
result = await workflow.ainvoke(state)
assert result["intent"] == CustomerIntent.BILLING
assert "환불" in result["messages"][-1]["content"]
@pytest.mark.asyncio
async def test_handoff_limit():
"""핸드오프 횟수 제한이 작동하는지 확인"""
state = ConversationState(
messages=[{
"role": "user",
"content": "복잡한 문의"
}],
customer_id="test-002",
intent=None,
current_agent="supervisor",
handoff_count=5, # 이미 5회 핸드오프
resolved=False,
resolution_summary=None,
metadata={}
)
result = await workflow.ainvoke(state)
assert result["resolved"] is True # 강제 종료
@pytest.mark.asyncio
async def test_checkpoint_recovery():
"""체크포인트에서 복구가 작동하는지 확인"""
config = {"configurable": {"thread_id": "test-recovery"}}
# 첫 실행
state = ConversationState(
messages=[{
"role": "user",
"content": "배송이 늦어지고 있어요"
}],
customer_id="test-003",
intent=None,
current_agent="supervisor",
handoff_count=0,
resolved=False,
resolution_summary=None,
metadata={}
)
await workflow.ainvoke(state, config)
# 추가 메시지 (체크포인트에서 복원)
result = await workflow.ainvoke(
{"messages": [{"role": "user", "content": "송장번호는 ABC123입니다"}]},
config
)
# 이전 컨텍스트가 유지되어야 함
assert result["intent"] == CustomerIntent.SHIPPING이 시리즈를 통해 멀티에이전트 오케스트레이션의 핵심 개념과 고급 패턴을 살펴보았습니다. 단일 에이전트의 한계에서 출발하여, 팀 아키텍처, 통신 프로토콜, 작업 위임, 협업 메커니즘, 컨트롤 플레인, 상태 관리, 플릿 스케일링, 관측 가능성, 보안과 거버넌스까지 프로덕션 멀티에이전트 시스템의 전체 스펙트럼을 다루었습니다.
핵심 요약입니다.
아키텍처 선택이 모든 것을 결정합니다. 감독자-워커, 계층적 팀, 피어-투-피어, 그룹 채팅 각각의 장단점을 이해하고, 사용 사례에 맞는 아키텍처를 선택하세요.
통신 표준은 상호운용성의 기반입니다. A2A와 MCP는 경쟁이 아닌 보완 관계입니다. 에이전트 간 수평적 통신은 A2A로, 도구 접근은 MCP로 표준화하세요.
관측 가능성 없는 프로덕션은 없습니다. 분산 추적, 구조화된 로깅, 의사결정 추적을 처음부터 설계에 포함하세요. 문제가 발생한 후 추가하는 것은 훨씬 어렵습니다.
보안은 나중이 아니라 처음부터입니다. 최소 권한 원칙, PII 보호, 감사 추적을 아키텍처의 기본 요소로 설계하세요.
멀티에이전트 오케스트레이션은 빠르게 발전하는 분야입니다. 이 시리즈의 개념과 패턴을 기반으로, 여러분의 프로덕션 환경에 맞는 시스템을 설계하고 구축해 나가시기 바랍니다.
이 글이 도움이 되셨나요?
감독자-워커, 계층적 팀, 피어-투-피어 네트워크 등 멀티에이전트 팀 아키텍처의 설계 원칙과 트레이드오프를 코드 예제와 함께 분석합니다.
단일 에이전트에서 멀티에이전트 시스템으로의 진화 과정, 오케스트레이션의 핵심 개념과 패턴 분류, 그리고 프로덕션 환경에서의 도전 과제를 살펴봅니다.
투표, 토론, 비판적 검토, 다수결 합의 등 여러 에이전트가 하나의 결론에 도달하기 위한 협업 패턴과 합의 프로토콜을 다룹니다.