본문으로 건너뛰기
Kreath Archive
TechProjectsBooksAbout
TechProjectsBooksAbout
© 2026 Kreath
홈TechProjectsBooksAbout
//
  1. 홈
  2. 테크
  3. 11장: 실전 프로젝트 — 멀티에이전트 오케스트레이션 시스템 구축
2026년 4월 9일·AI / ML·

11장: 실전 프로젝트 — 멀티에이전트 오케스트레이션 시스템 구축

고객 서비스 자동화 시스템을 처음부터 설계하고 구축하며, 감독자-워커 아키텍처, A2A 통신, 컨트롤 플레인, 관측 가능성을 통합합니다.

16분1,140자8개 섹션
ai-에이전트멀티에이전트오케스트레이션실전-프로젝트
공유
agent-orchestration11 / 11
1234567891011
이전10장: 보안과 거버넌스

프로젝트 개요

이 장에서는 시리즈 전체에서 다룬 개념을 종합하여 멀티에이전트 고객 서비스 시스템을 구축합니다. 이 시스템은 고객의 다양한 문의를 자동으로 분류하고, 전문 에이전트에게 위임하며, 복잡한 문의는 여러 에이전트가 협업하여 해결합니다.

시스템 요구사항

  • 고객 문의를 결제, 배송, 기술지원, 일반으로 분류하고 적절한 에이전트에게 라우팅
  • 에이전트 간 핸드오프와 작업 위임 지원
  • 실시간 상태 추적과 관측 가능성
  • 비용 관리와 모델 티어링
  • 감사 로그와 PII 보호

기술 스택

- 프레임워크: LangGraph (감독자-워커 아키텍처)
- 통신: A2A 프로토콜 (외부 에이전트 연동)
- 추적: OpenTelemetry + Jaeger
- 메트릭: Prometheus + Grafana
- 상태 저장: PostgreSQL (체크포인팅)
- API: FastAPI

1단계: 프로젝트 구조와 기반 설정

agent-cs-system/
├── src/
│   ├── agents/           # 에이전트 정의
│   │   ├── supervisor.py
│   │   ├── billing.py
│   │   ├── shipping.py
│   │   ├── technical.py
│   │   └── common.py
│   ├── orchestration/    # 오케스트레이션 엔진
│   │   ├── engine.py
│   │   ├── router.py
│   │   └── state.py
│   ├── control_plane/    # 컨트롤 플레인
│   │   ├── registry.py
│   │   ├── config.py
│   │   └── policy.py
│   ├── observability/    # 관측 가능성
│   │   ├── tracing.py
│   │   ├── metrics.py
│   │   └── logging.py
│   ├── security/         # 보안
│   │   ├── auth.py
│   │   ├── pii_filter.py
│   │   └── audit.py
│   └── api/              # API 엔드포인트
│       └── routes.py
├── tests/
├── config/
└── docker-compose.yml

공통 타입 정의

src/agents/common.py
python
from pydantic import BaseModel
from enum import Enum
from typing import TypedDict
 
class CustomerIntent(str, Enum):
    BILLING = "billing"
    SHIPPING = "shipping"
    TECHNICAL = "technical"
    GENERAL = "general"
 
class ConversationState(TypedDict):
    """전체 대화 상태"""
    messages: list[dict]
    customer_id: str | None
    intent: CustomerIntent | None
    current_agent: str
    handoff_count: int
    resolved: bool
    resolution_summary: str | None
    metadata: dict
 
class IntentClassification(BaseModel):
    intent: CustomerIntent
    confidence: float
    reasoning: str
 
class AgentResponse(BaseModel):
    message: str
    resolved: bool
    needs_handoff: bool
    handoff_target: str | None = None
    handoff_reason: str | None = None

2단계: 전문 에이전트 구현

결제 에이전트

src/agents/billing.py
python
from langgraph.prebuilt import create_react_agent
from src.observability.tracing import trace_tool_call
 
@trace_tool_call
async def query_order(customer_id: str, order_id: str) -> dict:
    """주문 내역 조회"""
    # 실제로는 DB 또는 API 호출
    return {
        "order_id": order_id,
        "status": "delivered",
        "amount": 45000,
        "payment_method": "card",
        "date": "2026-04-01"
    }
 
@trace_tool_call
async def process_refund(
    order_id: str, amount: int, reason: str
) -> dict:
    """환불 처리"""
    return {
        "refund_id": f"RF-{order_id}",
        "amount": amount,
        "status": "processed",
        "estimated_days": 3
    }
 
@trace_tool_call
async def check_payment_status(payment_id: str) -> dict:
    """결제 상태 확인"""
    return {
        "payment_id": payment_id,
        "status": "completed",
        "method": "card_ending_1234"
    }
 
billing_agent = create_react_agent(
    model="claude-sonnet-4-6",
    tools=[query_order, process_refund, check_payment_status],
    name="billing",
    prompt="""결제와 환불 전문 에이전트입니다.
 
역할:
- 주문 내역 조회 및 결제 상태 확인
- 환불 처리 (정책에 따라)
- 결제 관련 문의 응대
 
규칙:
1. 환불은 주문일로부터 30일 이내만 가능합니다.
2. 부분 환불은 사유 확인 후 처리합니다.
3. 배송 관련 문의가 포함되면 shipping 에이전트로 핸드오프합니다.
4. 기술적 결제 오류는 technical 에이전트로 핸드오프합니다.
 
응답 톤: 전문적이고 정중하게 응대합니다."""
)

배송 에이전트

src/agents/shipping.py
python
@trace_tool_call
async def track_package(tracking_number: str) -> dict:
    """배송 추적"""
    return {
        "tracking_number": tracking_number,
        "status": "in_transit",
        "current_location": "서울 송파 물류센터",
        "estimated_delivery": "2026-04-10",
        "history": [
            {"date": "2026-04-07", "event": "출고"},
            {"date": "2026-04-08", "event": "집하"},
            {"date": "2026-04-09", "event": "배송 중"}
        ]
    }
 
@trace_tool_call
async def update_delivery_address(
    order_id: str, new_address: str
) -> dict:
    """배송지 변경"""
    return {"status": "updated", "new_address": new_address}
 
shipping_agent = create_react_agent(
    model="claude-sonnet-4-6",
    tools=[track_package, update_delivery_address],
    name="shipping",
    prompt="""배송 추적과 물류 관리 전문 에이전트입니다.
 
역할:
- 배송 현황 실시간 추적
- 배송지 변경 처리
- 배송 지연 사유 안내
 
규칙:
1. 배송지 변경은 '배송 중' 상태 이전까지만 가능합니다.
2. 배송 사고로 인한 보상은 billing 에이전트로 핸드오프합니다.
3. 예상 배송일을 정확하게 안내합니다."""
)

기술지원 에이전트

src/agents/technical.py
python
@trace_tool_call
async def check_system_status(service: str) -> dict:
    """시스템 상태 확인"""
    return {
        "service": service,
        "status": "operational",
        "uptime": "99.95%",
        "last_incident": "2026-03-15"
    }
 
@trace_tool_call
async def run_diagnostics(
    customer_id: str, issue_type: str
) -> dict:
    """진단 실행"""
    return {
        "diagnosis": "cache_expired",
        "suggested_fix": "캐시 초기화 후 재접속",
        "auto_fixable": True
    }
 
technical_agent = create_react_agent(
    model="claude-sonnet-4-6",
    tools=[check_system_status, run_diagnostics],
    name="technical",
    prompt="""기술 지원 전문 에이전트입니다.
 
역할:
- 시스템 상태 확인 및 장애 안내
- 자동 진단 및 문제 해결
- 기술적 가이드 제공
 
규칙:
1. 먼저 시스템 상태를 확인하여 전체 장애 여부를 판단합니다.
2. 자동 수정이 가능한 문제는 고객 동의 후 실행합니다.
3. 결제 관련 기술 문제는 billing 에이전트로 핸드오프합니다."""
)

3단계: 감독자와 오케스트레이션

src/agents/supervisor.py
python
from langgraph_supervisor import create_supervisor
 
def create_cs_supervisor():
    """고객 서비스 감독자 생성"""
    supervisor = create_supervisor(
        agents=[billing_agent, shipping_agent, technical_agent],
        model="claude-opus-4-6",  # 감독자에는 강력한 모델
        prompt="""고객 서비스 팀의 감독자입니다.
 
당신의 역할:
1. 고객 문의를 분석하여 적절한 전문 에이전트에게 라우팅
2. 전문 에이전트의 응답 품질을 모니터링
3. 복합적인 문의는 여러 에이전트를 순차적으로 활용
4. 모든 문의가 해결될 때까지 관리
 
라우팅 기준:
- 결제, 환불, 청구서 → billing
- 배송, 택배, 추적 → shipping  
- 오류, 버그, 접속 문제 → technical
- 여러 영역에 걸친 문의 → 주요 영역의 에이전트부터 시작
 
규칙:
1. 핸드오프 횟수가 5회를 초과하면 문의를 요약하여 직접 응답
2. 전문 에이전트가 해결하지 못하면 에스컬레이션 안내
3. 항상 고객에게 현재 상황을 알려주세요"""
    )
 
    return supervisor.compile(
        checkpointer=postgres_checkpointer
    )

상태 그래프 설계

src/orchestration/engine.py
python
from langgraph.graph import StateGraph, END
 
def build_cs_workflow() -> StateGraph:
    """고객 서비스 워크플로우 그래프"""
    workflow = StateGraph(ConversationState)
 
    # 노드 추가
    workflow.add_node("classify", classify_intent)
    workflow.add_node("route", route_to_agent)
    workflow.add_node("billing", execute_billing)
    workflow.add_node("shipping", execute_shipping)
    workflow.add_node("technical", execute_technical)
    workflow.add_node("quality_check", check_response_quality)
 
    # 엣지 정의
    workflow.set_entry_point("classify")
    workflow.add_edge("classify", "route")
 
    workflow.add_conditional_edges(
        "route",
        lambda state: state["intent"].value,
        {
            "billing": "billing",
            "shipping": "shipping",
            "technical": "technical",
            "general": "quality_check"  # 일반 문의는 직접 응답
        }
    )
 
    # 전문 에이전트 실행 후 품질 검사
    workflow.add_edge("billing", "quality_check")
    workflow.add_edge("shipping", "quality_check")
    workflow.add_edge("technical", "quality_check")
 
    # 품질 검사 결과에 따라 종료 또는 재라우팅
    workflow.add_conditional_edges(
        "quality_check",
        lambda state: "end" if state["resolved"] else "route",
        {"end": END, "route": "route"}
    )
 
    return workflow
 
async def classify_intent(state: ConversationState) -> dict:
    """의도 분류"""
    last_message = state["messages"][-1]["content"]
    classification = await intent_classifier.run(last_message)
 
    agent_log.decision(
        decision_type="classification",
        choice=classification.intent.value,
        alternatives=[e.value for e in CustomerIntent],
        reasoning=classification.reasoning
    )
 
    return {
        "intent": classification.intent,
        "metadata": {
            **state.get("metadata", {}),
            "classification_confidence": classification.confidence
        }
    }
 
async def check_response_quality(
    state: ConversationState
) -> dict:
    """응답 품질 검사"""
    if state.get("resolved"):
        return state
 
    # 핸드오프 횟수 체크
    if state.get("handoff_count", 0) >= 5:
        return {
            "resolved": True,
            "resolution_summary": (
                "여러 전문 에이전트를 통해 처리를 시도했으나 "
                "완전한 해결이 어려워 담당자 연결을 안내했습니다."
            )
        }
 
    return state

4단계: 관측 가능성 통합

src/observability/tracing.py
python
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
    OTLPSpanExporter
)
 
def setup_tracing():
    """OpenTelemetry 추적 설정"""
    provider = TracerProvider()
    processor = BatchSpanProcessor(
        OTLPSpanExporter(endpoint="http://jaeger:4317")
    )
    provider.add_span_processor(processor)
    trace.set_tracer_provider(provider)
 
tracer = trace.get_tracer("cs-agent-system")
src/observability/metrics.py
python
from prometheus_client import Counter, Histogram, Gauge
 
# 핵심 메트릭 정의
conversations_total = Counter(
    "cs_conversations_total",
    "Total conversations",
    ["intent", "resolved"]
)
 
resolution_time = Histogram(
    "cs_resolution_seconds",
    "Time to resolve customer inquiry",
    ["intent"],
    buckets=[5, 10, 30, 60, 120, 300]
)
 
active_conversations = Gauge(
    "cs_active_conversations",
    "Currently active conversations"
)
 
handoff_count = Counter(
    "cs_handoffs_total",
    "Total handoffs between agents",
    ["source", "target"]
)
 
cost_per_conversation = Histogram(
    "cs_cost_per_conversation_usd",
    "Cost per conversation in USD",
    ["intent"],
    buckets=[0.01, 0.05, 0.1, 0.5, 1.0]
)

5단계: API 레이어

src/api/routes.py
python
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
 
app = FastAPI(title="Customer Service Agent System")
 
class ConversationRequest(BaseModel):
    customer_id: str
    message: str
 
class ConversationResponse(BaseModel):
    conversation_id: str
    response: str
    resolved: bool
    agent_used: str
 
@app.post("/conversations", response_model=ConversationResponse)
async def start_conversation(request: ConversationRequest):
    """새 대화 시작"""
    with tracer.start_as_current_span("api.start_conversation"):
        conversation_id = generate_id()
 
        initial_state = ConversationState(
            messages=[{
                "role": "user",
                "content": request.message
            }],
            customer_id=request.customer_id,
            intent=None,
            current_agent="supervisor",
            handoff_count=0,
            resolved=False,
            resolution_summary=None,
            metadata={}
        )
 
        # PII 필터링
        filtered_state = pii_filter.filter_message(
            initial_state, "internal"
        )
 
        # 워크플로우 실행
        config = {
            "configurable": {"thread_id": conversation_id}
        }
        result = await workflow.ainvoke(filtered_state, config)
 
        # 메트릭 기록
        conversations_total.labels(
            intent=result.get("intent", "unknown"),
            resolved=str(result.get("resolved", False))
        ).inc()
 
        # 감사 로그
        await audit_logger.log(AuditEntry(
            timestamp=datetime.now(),
            agent_id="system",
            action="conversation_started",
            resource=conversation_id,
            details={
                "customer_id": request.customer_id,
                "intent": str(result.get("intent"))
            },
            outcome="success"
        ))
 
        return ConversationResponse(
            conversation_id=conversation_id,
            response=result["messages"][-1]["content"],
            resolved=result.get("resolved", False),
            agent_used=result.get("current_agent", "supervisor")
        )
 
@app.post("/conversations/{id}/messages")
async def continue_conversation(
    id: str, request: ConversationRequest
):
    """대화 계속"""
    config = {"configurable": {"thread_id": id}}
 
    # 체크포인트에서 이전 상태 복원
    state_update = {
        "messages": [{
            "role": "user",
            "content": request.message
        }]
    }
 
    result = await workflow.ainvoke(state_update, config)
 
    return ConversationResponse(
        conversation_id=id,
        response=result["messages"][-1]["content"],
        resolved=result.get("resolved", False),
        agent_used=result.get("current_agent", "supervisor")
    )
 
@app.get("/dashboard")
async def get_dashboard():
    """운영 대시보드 데이터"""
    return await fleet_dashboard.get_overview()

6단계: 통합 테스트

tests/test_workflow.py
python
import pytest
 
@pytest.mark.asyncio
async def test_billing_routing():
    """결제 문의가 billing 에이전트로 라우팅되는지 확인"""
    state = ConversationState(
        messages=[{
            "role": "user",
            "content": "지난주 주문한 상품을 환불받고 싶습니다"
        }],
        customer_id="test-001",
        intent=None,
        current_agent="supervisor",
        handoff_count=0,
        resolved=False,
        resolution_summary=None,
        metadata={}
    )
 
    result = await workflow.ainvoke(state)
    assert result["intent"] == CustomerIntent.BILLING
    assert "환불" in result["messages"][-1]["content"]
 
@pytest.mark.asyncio
async def test_handoff_limit():
    """핸드오프 횟수 제한이 작동하는지 확인"""
    state = ConversationState(
        messages=[{
            "role": "user",
            "content": "복잡한 문의"
        }],
        customer_id="test-002",
        intent=None,
        current_agent="supervisor",
        handoff_count=5,  # 이미 5회 핸드오프
        resolved=False,
        resolution_summary=None,
        metadata={}
    )
 
    result = await workflow.ainvoke(state)
    assert result["resolved"] is True  # 강제 종료
 
@pytest.mark.asyncio
async def test_checkpoint_recovery():
    """체크포인트에서 복구가 작동하는지 확인"""
    config = {"configurable": {"thread_id": "test-recovery"}}
 
    # 첫 실행
    state = ConversationState(
        messages=[{
            "role": "user",
            "content": "배송이 늦어지고 있어요"
        }],
        customer_id="test-003",
        intent=None,
        current_agent="supervisor",
        handoff_count=0,
        resolved=False,
        resolution_summary=None,
        metadata={}
    )
    await workflow.ainvoke(state, config)
 
    # 추가 메시지 (체크포인트에서 복원)
    result = await workflow.ainvoke(
        {"messages": [{"role": "user", "content": "송장번호는 ABC123입니다"}]},
        config
    )
 
    # 이전 컨텍스트가 유지되어야 함
    assert result["intent"] == CustomerIntent.SHIPPING

시리즈 마무리

이 시리즈를 통해 멀티에이전트 오케스트레이션의 핵심 개념과 고급 패턴을 살펴보았습니다. 단일 에이전트의 한계에서 출발하여, 팀 아키텍처, 통신 프로토콜, 작업 위임, 협업 메커니즘, 컨트롤 플레인, 상태 관리, 플릿 스케일링, 관측 가능성, 보안과 거버넌스까지 프로덕션 멀티에이전트 시스템의 전체 스펙트럼을 다루었습니다.

핵심 요약입니다.

아키텍처 선택이 모든 것을 결정합니다. 감독자-워커, 계층적 팀, 피어-투-피어, 그룹 채팅 각각의 장단점을 이해하고, 사용 사례에 맞는 아키텍처를 선택하세요.

통신 표준은 상호운용성의 기반입니다. A2A와 MCP는 경쟁이 아닌 보완 관계입니다. 에이전트 간 수평적 통신은 A2A로, 도구 접근은 MCP로 표준화하세요.

관측 가능성 없는 프로덕션은 없습니다. 분산 추적, 구조화된 로깅, 의사결정 추적을 처음부터 설계에 포함하세요. 문제가 발생한 후 추가하는 것은 훨씬 어렵습니다.

보안은 나중이 아니라 처음부터입니다. 최소 권한 원칙, PII 보호, 감사 추적을 아키텍처의 기본 요소로 설계하세요.

멀티에이전트 오케스트레이션은 빠르게 발전하는 분야입니다. 이 시리즈의 개념과 패턴을 기반으로, 여러분의 프로덕션 환경에 맞는 시스템을 설계하고 구축해 나가시기 바랍니다.

이 글이 도움이 되셨나요?

관련 글

AI / ML

2장: 멀티에이전트 팀 아키텍처 설계

감독자-워커, 계층적 팀, 피어-투-피어 네트워크 등 멀티에이전트 팀 아키텍처의 설계 원칙과 트레이드오프를 코드 예제와 함께 분석합니다.

2026년 3월 24일·20분
AI / ML

1장: 멀티에이전트 오케스트레이션의 진화와 핵심 개념

단일 에이전트에서 멀티에이전트 시스템으로의 진화 과정, 오케스트레이션의 핵심 개념과 패턴 분류, 그리고 프로덕션 환경에서의 도전 과제를 살펴봅니다.

2026년 3월 22일·19분
AI / ML

5장: 에이전트 협업과 합의 메커니즘

투표, 토론, 비판적 검토, 다수결 합의 등 여러 에이전트가 하나의 결론에 도달하기 위한 협업 패턴과 합의 프로토콜을 다룹니다.

2026년 3월 30일·16분
이전 글10장: 보안과 거버넌스

댓글

목차

약 16분 남음
  • 프로젝트 개요
    • 시스템 요구사항
    • 기술 스택
  • 1단계: 프로젝트 구조와 기반 설정
    • 공통 타입 정의
  • 2단계: 전문 에이전트 구현
    • 결제 에이전트
    • 배송 에이전트
    • 기술지원 에이전트
  • 3단계: 감독자와 오케스트레이션
    • 상태 그래프 설계
  • 4단계: 관측 가능성 통합
  • 5단계: API 레이어
  • 6단계: 통합 테스트
  • 시리즈 마무리