본문으로 건너뛰기
Kreath Archive
TechProjectsBooksAbout
TechProjectsBooksAbout
© 2026 Kreath
홈TechProjectsBooksAbout
//
  1. 홈
  2. 테크
  3. 4장: 핸드오프와 작업 위임 패턴
2026년 3월 28일·AI / ML·

4장: 핸드오프와 작업 위임 패턴

에이전트 간 작업 위임의 핵심 메커니즘인 핸드오프 패턴, 라우팅 전략, 동적 위임의 구현과 최적화를 실전 코드와 함께 다룹니다.

17분1,082자5개 섹션
ai-에이전트멀티에이전트핸드오프작업-위임
공유
agent-orchestration4 / 11
1234567891011
이전3장: 에이전트 간 통신 프로토콜 — A2A, MCP, 커스텀 패턴다음5장: 에이전트 협업과 합의 메커니즘

작업 위임의 본질

멀티에이전트 시스템에서 작업 위임(Task Delegation)은 한 에이전트가 자신의 범위를 넘어서는 작업을 적절한 에이전트에게 맡기는 행위입니다. 효과적인 위임은 "누구에게 맡기는가"뿐만 아니라 "무엇을 전달하는가", "결과를 어떻게 활용하는가"까지 포함합니다.

위임은 크게 두 가지 유형으로 나뉩니다.

핸드오프(Handoff): 현재 에이전트가 대화의 제어권 자체를 다른 에이전트에게 넘깁니다. 고객 서비스에서 "결제 담당자에게 연결해드리겠습니다"와 같습니다. 제어권이 이전되면 원래 에이전트는 더 이상 관여하지 않습니다.

서브태스크 위임(Subtask Delegation): 현재 에이전트가 제어권을 유지하면서 특정 하위 작업만 다른 에이전트에게 맡깁니다. 상사가 부하 직원에게 조사를 시키고 결과를 보고받는 것과 같습니다.

핸드오프 패턴 심화

기본 핸드오프 구현

OpenAI Agents SDK에서의 핸드오프는 에이전트의 도구처럼 동작합니다. 에이전트가 핸드오프를 "호출"하면 제어권이 대상 에이전트로 넘어갑니다.

python
from agents import Agent, handoff, RunContext
 
# 핸드오프 시 컨텍스트를 필터링하는 콜백
def billing_handoff_filter(context: RunContext) -> str:
    """결제 에이전트에게 필요한 정보만 전달"""
    return f"""고객 ID: {context.context.get('customer_id')}
주문 번호: {context.context.get('order_id')}
문의 유형: 결제/환불 관련
대화 요약: {summarize_conversation(context.messages[-5:])}"""
 
triage_agent = Agent(
    name="triage",
    instructions="""고객 문의를 분류하고 적절한 전문 에이전트에게 연결합니다.
    
    다음 기준으로 분류하세요:
    - 결제, 환불, 청구 관련 → billing 핸드오프
    - 배송, 추적, 수거 관련 → shipping 핸드오프
    - 제품 기능, 버그, 오류 → technical 핸드오프
    - 위 분류에 해당하지 않으면 직접 응답""",
    handoffs=[
        handoff(
            agent=billing_agent,
            description="결제, 환불, 청구서 관련 문의",
            on_handoff=billing_handoff_filter
        ),
        handoff(
            agent=shipping_agent,
            description="배송 추적, 배송 지연, 반품 수거 관련 문의"
        ),
        handoff(
            agent=technical_agent,
            description="제품 기능 문제, 버그 리포트, 기술 지원"
        )
    ]
)

조건부 핸드오프

단순 분류를 넘어, 대화 상태나 외부 조건에 따라 핸드오프 여부를 동적으로 결정해야 하는 경우가 있습니다.

python
from agents import Agent, handoff
 
def should_escalate(context: RunContext) -> bool:
    """에스컬레이션 조건 평가"""
    messages = context.messages
    # 3회 이상 같은 문제 반복 시 에스컬레이션
    if count_repeated_issues(messages) >= 3:
        return True
    # 고객 감정이 부정적으로 전환되면 에스컬레이션
    if detect_negative_sentiment(messages[-3:]):
        return True
    # VIP 고객은 즉시 에스컬레이션
    if context.context.get("customer_tier") == "vip":
        return True
    return False
 
support_agent = Agent(
    name="support",
    instructions="""1차 고객 지원을 담당합니다.
    문제 해결이 어렵거나 고객이 불만족스러운 경우,
    escalation_check 도구를 사용하여 에스컬레이션 필요 여부를 확인하세요.""",
    tools=[
        query_faq,
        check_order_status,
        {
            "name": "escalation_check",
            "description": "현재 상황이 에스컬레이션이 필요한지 판단",
            "handler": should_escalate
        }
    ],
    handoffs=[
        handoff(
            agent=senior_agent,
            description="에스컬레이션이 필요한 복잡한 문제"
        )
    ]
)

양방향 핸드오프

실제 서비스에서는 에이전트 간에 양방향 핸드오프가 필요합니다. 결제 문의 중 배송 문제가 발견되면 배송 에이전트로, 배송 에이전트에서 환불이 필요하면 다시 결제 에이전트로 돌아와야 합니다.

python
billing_agent = Agent(
    name="billing",
    instructions="결제와 환불을 처리합니다.",
    tools=[process_refund, check_payment],
    handoffs=[
        handoff(agent="shipping", description="배송 관련 확인이 필요한 경우"),
        handoff(agent="triage", description="분류가 잘못된 경우 재분류")
    ]
)
 
shipping_agent = Agent(
    name="shipping",
    instructions="배송을 추적하고 관리합니다.",
    tools=[track_package, arrange_return],
    handoffs=[
        handoff(agent="billing", description="환불이 필요한 배송 사고"),
        handoff(agent="triage", description="분류가 잘못된 경우 재분류")
    ]
)
Warning

양방향 핸드오프는 무한 루프의 위험이 있습니다. 반드시 핸드오프 횟수를 추적하고, 상한(보통 3~5회)을 설정하세요. 상한에 도달하면 인간 에스컬레이션으로 전환해야 합니다.

라우팅 전략

감독자 패턴에서 라우팅은 사용자 요청을 분석하여 적절한 워커 에이전트를 선택하는 과정입니다. 라우팅의 정확도가 전체 시스템의 성능을 결정합니다.

의도 기반 라우팅(Intent-Based Routing)

사용자 요청의 의도를 분류하여 해당 의도를 처리할 수 있는 에이전트로 라우팅합니다.

python
from pydantic import BaseModel
from enum import Enum
 
class UserIntent(str, Enum):
    CODE_REVIEW = "code_review"
    BUG_FIX = "bug_fix"
    FEATURE_REQUEST = "feature_request"
    DOCUMENTATION = "documentation"
    DEPLOYMENT = "deployment"
 
class IntentClassification(BaseModel):
    intent: UserIntent
    confidence: float
    reasoning: str
 
# 의도 분류기
intent_classifier = Agent(
    name="intent-classifier",
    instructions="""사용자 요청의 의도를 분류합니다.
    다음 카테고리 중 하나를 선택하세요:
    - code_review: 코드 리뷰, 코드 품질 분석
    - bug_fix: 버그 수정, 에러 해결
    - feature_request: 새 기능 구현
    - documentation: 문서 작성, 주석 추가
    - deployment: 배포, CI/CD 관련""",
    output_type=IntentClassification
)
 
# 의도별 에이전트 매핑
agent_registry = {
    UserIntent.CODE_REVIEW: code_review_agent,
    UserIntent.BUG_FIX: debugging_agent,
    UserIntent.FEATURE_REQUEST: development_agent,
    UserIntent.DOCUMENTATION: docs_agent,
    UserIntent.DEPLOYMENT: devops_agent,
}
 
async def route_request(user_message: str):
    classification = await intent_classifier.run(user_message)
    target_agent = agent_registry[classification.intent]
    return await target_agent.run(user_message)

능력 기반 라우팅(Capability-Based Routing)

에이전트가 보유한 도구와 능력을 기준으로 라우팅합니다. A2A의 Agent Card가 이 패턴을 지원합니다.

python
@dataclass
class AgentCapability:
    agent_name: str
    skills: list[str]
    tools: list[str]
    max_complexity: int  # 1-10
    avg_latency_ms: int
 
class CapabilityRouter:
    def __init__(self):
        self.registry: list[AgentCapability] = []
 
    def register(self, capability: AgentCapability):
        self.registry.append(capability)
 
    async def route(
        self, task: str, required_skills: list[str]
    ) -> str:
        """필요한 스킬을 보유한 에이전트를 선택"""
        candidates = []
        for cap in self.registry:
            skill_match = len(
                set(required_skills) & set(cap.skills)
            ) / len(required_skills)
            if skill_match > 0.5:  # 50% 이상 스킬 일치
                candidates.append((cap, skill_match))
 
        if not candidates:
            return "fallback_agent"
 
        # 스킬 일치도와 지연시간을 고려한 최적 선택
        candidates.sort(
            key=lambda x: (x[1], -x[0].avg_latency_ms),
            reverse=True
        )
        return candidates[0][0].agent_name
 
# 에이전트 등록
router = CapabilityRouter()
router.register(AgentCapability(
    agent_name="sql-analyst",
    skills=["sql", "data-analysis", "visualization"],
    tools=["postgres_query", "chart_generator"],
    max_complexity=8,
    avg_latency_ms=3000
))
router.register(AgentCapability(
    agent_name="code-expert",
    skills=["python", "typescript", "code-review"],
    tools=["linter", "test_runner", "git_operations"],
    max_complexity=9,
    avg_latency_ms=5000
))

시맨틱 라우팅(Semantic Routing)

요청의 의미적 유사도를 기반으로 라우팅합니다. 사전 정의된 카테고리에 맞지 않는 요청도 처리할 수 있습니다.

python
from sentence_transformers import SentenceTransformer
import numpy as np
 
class SemanticRouter:
    def __init__(self):
        self.model = SentenceTransformer("all-MiniLM-L6-v2")
        self.routes: list[dict] = []
 
    def add_route(
        self,
        agent_name: str,
        description: str,
        examples: list[str]
    ):
        """라우팅 경로 등록 (설명 + 예시)"""
        all_texts = [description] + examples
        embeddings = self.model.encode(all_texts)
        self.routes.append({
            "agent": agent_name,
            "embeddings": embeddings,
            "centroid": np.mean(embeddings, axis=0)
        })
 
    async def route(self, query: str) -> str:
        """쿼리와 가장 유사한 라우트 선택"""
        query_embedding = self.model.encode(query)
        best_score = -1
        best_agent = None
 
        for route in self.routes:
            score = np.dot(
                query_embedding, route["centroid"]
            ) / (
                np.linalg.norm(query_embedding)
                * np.linalg.norm(route["centroid"])
            )
            if score > best_score:
                best_score = score
                best_agent = route["agent"]
 
        return best_agent if best_score > 0.6 else "fallback"
 
# 라우트 등록
semantic_router = SemanticRouter()
semantic_router.add_route(
    "billing",
    "결제, 환불, 청구서 관련 업무",
    ["환불 받고 싶어요", "카드 결제가 안 돼요", "영수증 발급해주세요"]
)
semantic_router.add_route(
    "shipping",
    "배송 추적, 배송 지연, 반품 관련 업무",
    ["택배가 안 와요", "배송 현황 알려주세요", "반품하고 싶어요"]
)

동적 위임과 작업 분해

작업 분해 전략

복잡한 요청을 하위 작업으로 분해하는 방식에 따라 시스템의 효율성이 크게 달라집니다.

python
from pydantic import BaseModel
 
class SubTask(BaseModel):
    id: str
    description: str
    assigned_agent: str
    dependencies: list[str]  # 선행 작업 ID
    priority: int
 
class TaskDecomposition(BaseModel):
    original_request: str
    subtasks: list[SubTask]
    execution_plan: str  # sequential, parallel, mixed
 
# 작업 분해 에이전트
decomposer = Agent(
    name="task-decomposer",
    instructions="""사용자 요청을 분석하여 하위 작업으로 분해합니다.
 
    규칙:
    1. 각 하위 작업은 하나의 에이전트가 독립적으로 수행 가능해야 합니다.
    2. 의존 관계를 명확히 표시하세요.
    3. 병렬 실행 가능한 작업은 dependencies를 빈 리스트로 설정하세요.
    4. 하위 작업 수는 2~6개가 적절합니다.
    
    사용 가능한 에이전트:
    - researcher: 정보 검색과 조사
    - analyst: 데이터 분석
    - writer: 문서/보고서 작성
    - reviewer: 품질 검증""",
    output_type=TaskDecomposition
)
 
# 사용 예시
decomposition = await decomposer.run(
    "Q1 매출 데이터를 분석하고, 경쟁사와 비교한 보고서를 작성해주세요"
)
# 결과:
# subtasks = [
#   {id: "1", description: "Q1 매출 데이터 수집", agent: "researcher", deps: []},
#   {id: "2", description: "경쟁사 데이터 수집", agent: "researcher", deps: []},
#   {id: "3", description: "매출 데이터 분석", agent: "analyst", deps: ["1"]},
#   {id: "4", description: "경쟁사 비교 분석", agent: "analyst", deps: ["1", "2"]},
#   {id: "5", description: "보고서 작성", agent: "writer", deps: ["3", "4"]},
# ]
# execution_plan: "mixed" (1,2는 병렬 → 3,4 → 5 순차)

의존성 기반 실행 엔진

분해된 작업을 의존성 순서에 따라 효율적으로 실행하는 엔진이 필요합니다.

python
import asyncio
from collections import defaultdict
 
class TaskExecutionEngine:
    def __init__(self, agent_registry: dict[str, Agent]):
        self.agents = agent_registry
        self.results: dict[str, any] = {}
 
    async def execute(
        self, decomposition: TaskDecomposition
    ) -> dict[str, any]:
        """의존성 그래프를 따라 작업 실행"""
        # 의존성 역방향 맵 구성
        dependents = defaultdict(list)
        pending_deps = {}
 
        for task in decomposition.subtasks:
            pending_deps[task.id] = set(task.dependencies)
            for dep in task.dependencies:
                dependents[dep].append(task.id)
 
        # 준비된 작업 큐
        ready = asyncio.Queue()
        for task in decomposition.subtasks:
            if not task.dependencies:
                await ready.put(task)
 
        completed = 0
        total = len(decomposition.subtasks)
 
        async def worker():
            nonlocal completed
            while completed < total:
                task = await ready.get()
                # 선행 작업 결과를 컨텍스트로 전달
                context = {
                    dep_id: self.results[dep_id]
                    for dep_id in task.dependencies
                    if dep_id in self.results
                }
                agent = self.agents[task.assigned_agent]
                result = await agent.run(
                    f"{task.description}\n\n이전 단계 결과: {context}"
                )
                self.results[task.id] = result
                completed += 1
 
                # 이 작업에 의존하는 다른 작업 확인
                for dependent_id in dependents[task.id]:
                    pending_deps[dependent_id].discard(task.id)
                    if not pending_deps[dependent_id]:
                        dep_task = next(
                            t for t in decomposition.subtasks
                            if t.id == dependent_id
                        )
                        await ready.put(dep_task)
 
        # 병렬 워커로 실행
        workers = [asyncio.create_task(worker()) for _ in range(3)]
        await asyncio.gather(*workers)
 
        return self.results
Tip

작업 분해 시 "적절한 세분화 수준"을 찾는 것이 핵심입니다. 너무 세분화하면 에이전트 간 통신 비용이 증가하고, 너무 뭉치면 단일 에이전트 한계를 반복합니다. 하나의 하위 작업은 하나의 LLM 호출로 완료 가능한 수준이 이상적입니다.

위임 실패 처리

폴백 전략

위임된 에이전트가 실패하거나 부적절한 결과를 반환할 때의 대응 전략입니다.

python
class DelegationWithFallback:
    def __init__(self, primary: Agent, fallbacks: list[Agent]):
        self.primary = primary
        self.fallbacks = fallbacks
 
    async def execute(
        self, task: str, max_retries: int = 2
    ) -> dict:
        # 1. 1차 에이전트 시도
        try:
            result = await self.primary.run(task)
            if self.validate_result(result):
                return {"agent": self.primary.name, "result": result}
        except Exception as e:
            log.warning(f"Primary agent failed: {e}")
 
        # 2. 폴백 에이전트 순차 시도
        for fallback in self.fallbacks:
            try:
                result = await fallback.run(task)
                if self.validate_result(result):
                    return {"agent": fallback.name, "result": result}
            except Exception:
                continue
 
        # 3. 모든 에이전트 실패 시
        return {
            "agent": "none",
            "result": None,
            "error": "모든 에이전트가 작업을 처리하지 못했습니다"
        }
 
    def validate_result(self, result: any) -> bool:
        """결과의 유효성 검증"""
        if result is None:
            return False
        if isinstance(result, str) and len(result) < 10:
            return False
        return True

타임아웃과 회로 차단기

장시간 응답이 없는 에이전트에 대한 보호 메커니즘이 필요합니다.

python
import asyncio
 
class CircuitBreaker:
    def __init__(
        self, failure_threshold: int = 3,
        recovery_time: float = 60.0
    ):
        self.failure_count = 0
        self.failure_threshold = failure_threshold
        self.recovery_time = recovery_time
        self.last_failure_time = 0
        self.state = "closed"  # closed, open, half-open
 
    async def call(self, agent: Agent, task: str, timeout: float = 30.0):
        if self.state == "open":
            if time.time() - self.last_failure_time > self.recovery_time:
                self.state = "half-open"
            else:
                raise CircuitOpenError(
                    f"Agent {agent.name} circuit is open"
                )
 
        try:
            result = await asyncio.wait_for(
                agent.run(task), timeout=timeout
            )
            if self.state == "half-open":
                self.state = "closed"
                self.failure_count = 0
            return result
        except (asyncio.TimeoutError, Exception) as e:
            self.failure_count += 1
            self.last_failure_time = time.time()
            if self.failure_count >= self.failure_threshold:
                self.state = "open"
            raise

다음 장에서는 여러 에이전트가 하나의 결론에 도달하기 위해 협업하는 메커니즘을 다룹니다. 투표, 토론, 합의 프로토콜 등 에이전트 간 협업의 고급 패턴을 살펴봅니다.

이 글이 도움이 되셨나요?

관련 글

AI / ML

5장: 에이전트 협업과 합의 메커니즘

투표, 토론, 비판적 검토, 다수결 합의 등 여러 에이전트가 하나의 결론에 도달하기 위한 협업 패턴과 합의 프로토콜을 다룹니다.

2026년 3월 30일·16분
AI / ML

3장: 에이전트 간 통신 프로토콜 — A2A, MCP, 커스텀 패턴

A2A(Agent-to-Agent) 프로토콜, MCP(Model Context Protocol), 그리고 커스텀 메시지 패턴까지 에이전트 간 통신의 표준과 구현을 다룹니다.

2026년 3월 25일·15분
AI / ML

2장: 멀티에이전트 팀 아키텍처 설계

감독자-워커, 계층적 팀, 피어-투-피어 네트워크 등 멀티에이전트 팀 아키텍처의 설계 원칙과 트레이드오프를 코드 예제와 함께 분석합니다.

2026년 3월 24일·20분
이전 글3장: 에이전트 간 통신 프로토콜 — A2A, MCP, 커스텀 패턴
다음 글5장: 에이전트 협업과 합의 메커니즘

댓글

목차

약 17분 남음
  • 작업 위임의 본질
  • 핸드오프 패턴 심화
    • 기본 핸드오프 구현
    • 조건부 핸드오프
    • 양방향 핸드오프
  • 라우팅 전략
    • 의도 기반 라우팅(Intent-Based Routing)
    • 능력 기반 라우팅(Capability-Based Routing)
    • 시맨틱 라우팅(Semantic Routing)
  • 동적 위임과 작업 분해
    • 작업 분해 전략
    • 의존성 기반 실행 엔진
  • 위임 실패 처리
    • 폴백 전략
    • 타임아웃과 회로 차단기