에이전트 간 작업 위임의 핵심 메커니즘인 핸드오프 패턴, 라우팅 전략, 동적 위임의 구현과 최적화를 실전 코드와 함께 다룹니다.
멀티에이전트 시스템에서 작업 위임(Task Delegation)은 한 에이전트가 자신의 범위를 넘어서는 작업을 적절한 에이전트에게 맡기는 행위입니다. 효과적인 위임은 "누구에게 맡기는가"뿐만 아니라 "무엇을 전달하는가", "결과를 어떻게 활용하는가"까지 포함합니다.
위임은 크게 두 가지 유형으로 나뉩니다.
핸드오프(Handoff): 현재 에이전트가 대화의 제어권 자체를 다른 에이전트에게 넘깁니다. 고객 서비스에서 "결제 담당자에게 연결해드리겠습니다"와 같습니다. 제어권이 이전되면 원래 에이전트는 더 이상 관여하지 않습니다.
서브태스크 위임(Subtask Delegation): 현재 에이전트가 제어권을 유지하면서 특정 하위 작업만 다른 에이전트에게 맡깁니다. 상사가 부하 직원에게 조사를 시키고 결과를 보고받는 것과 같습니다.
OpenAI Agents SDK에서의 핸드오프는 에이전트의 도구처럼 동작합니다. 에이전트가 핸드오프를 "호출"하면 제어권이 대상 에이전트로 넘어갑니다.
from agents import Agent, handoff, RunContext
# 핸드오프 시 컨텍스트를 필터링하는 콜백
def billing_handoff_filter(context: RunContext) -> str:
"""결제 에이전트에게 필요한 정보만 전달"""
return f"""고객 ID: {context.context.get('customer_id')}
주문 번호: {context.context.get('order_id')}
문의 유형: 결제/환불 관련
대화 요약: {summarize_conversation(context.messages[-5:])}"""
triage_agent = Agent(
name="triage",
instructions="""고객 문의를 분류하고 적절한 전문 에이전트에게 연결합니다.
다음 기준으로 분류하세요:
- 결제, 환불, 청구 관련 → billing 핸드오프
- 배송, 추적, 수거 관련 → shipping 핸드오프
- 제품 기능, 버그, 오류 → technical 핸드오프
- 위 분류에 해당하지 않으면 직접 응답""",
handoffs=[
handoff(
agent=billing_agent,
description="결제, 환불, 청구서 관련 문의",
on_handoff=billing_handoff_filter
),
handoff(
agent=shipping_agent,
description="배송 추적, 배송 지연, 반품 수거 관련 문의"
),
handoff(
agent=technical_agent,
description="제품 기능 문제, 버그 리포트, 기술 지원"
)
]
)단순 분류를 넘어, 대화 상태나 외부 조건에 따라 핸드오프 여부를 동적으로 결정해야 하는 경우가 있습니다.
from agents import Agent, handoff
def should_escalate(context: RunContext) -> bool:
"""에스컬레이션 조건 평가"""
messages = context.messages
# 3회 이상 같은 문제 반복 시 에스컬레이션
if count_repeated_issues(messages) >= 3:
return True
# 고객 감정이 부정적으로 전환되면 에스컬레이션
if detect_negative_sentiment(messages[-3:]):
return True
# VIP 고객은 즉시 에스컬레이션
if context.context.get("customer_tier") == "vip":
return True
return False
support_agent = Agent(
name="support",
instructions="""1차 고객 지원을 담당합니다.
문제 해결이 어렵거나 고객이 불만족스러운 경우,
escalation_check 도구를 사용하여 에스컬레이션 필요 여부를 확인하세요.""",
tools=[
query_faq,
check_order_status,
{
"name": "escalation_check",
"description": "현재 상황이 에스컬레이션이 필요한지 판단",
"handler": should_escalate
}
],
handoffs=[
handoff(
agent=senior_agent,
description="에스컬레이션이 필요한 복잡한 문제"
)
]
)실제 서비스에서는 에이전트 간에 양방향 핸드오프가 필요합니다. 결제 문의 중 배송 문제가 발견되면 배송 에이전트로, 배송 에이전트에서 환불이 필요하면 다시 결제 에이전트로 돌아와야 합니다.
billing_agent = Agent(
name="billing",
instructions="결제와 환불을 처리합니다.",
tools=[process_refund, check_payment],
handoffs=[
handoff(agent="shipping", description="배송 관련 확인이 필요한 경우"),
handoff(agent="triage", description="분류가 잘못된 경우 재분류")
]
)
shipping_agent = Agent(
name="shipping",
instructions="배송을 추적하고 관리합니다.",
tools=[track_package, arrange_return],
handoffs=[
handoff(agent="billing", description="환불이 필요한 배송 사고"),
handoff(agent="triage", description="분류가 잘못된 경우 재분류")
]
)양방향 핸드오프는 무한 루프의 위험이 있습니다. 반드시 핸드오프 횟수를 추적하고, 상한(보통 3~5회)을 설정하세요. 상한에 도달하면 인간 에스컬레이션으로 전환해야 합니다.
감독자 패턴에서 라우팅은 사용자 요청을 분석하여 적절한 워커 에이전트를 선택하는 과정입니다. 라우팅의 정확도가 전체 시스템의 성능을 결정합니다.
사용자 요청의 의도를 분류하여 해당 의도를 처리할 수 있는 에이전트로 라우팅합니다.
from pydantic import BaseModel
from enum import Enum
class UserIntent(str, Enum):
CODE_REVIEW = "code_review"
BUG_FIX = "bug_fix"
FEATURE_REQUEST = "feature_request"
DOCUMENTATION = "documentation"
DEPLOYMENT = "deployment"
class IntentClassification(BaseModel):
intent: UserIntent
confidence: float
reasoning: str
# 의도 분류기
intent_classifier = Agent(
name="intent-classifier",
instructions="""사용자 요청의 의도를 분류합니다.
다음 카테고리 중 하나를 선택하세요:
- code_review: 코드 리뷰, 코드 품질 분석
- bug_fix: 버그 수정, 에러 해결
- feature_request: 새 기능 구현
- documentation: 문서 작성, 주석 추가
- deployment: 배포, CI/CD 관련""",
output_type=IntentClassification
)
# 의도별 에이전트 매핑
agent_registry = {
UserIntent.CODE_REVIEW: code_review_agent,
UserIntent.BUG_FIX: debugging_agent,
UserIntent.FEATURE_REQUEST: development_agent,
UserIntent.DOCUMENTATION: docs_agent,
UserIntent.DEPLOYMENT: devops_agent,
}
async def route_request(user_message: str):
classification = await intent_classifier.run(user_message)
target_agent = agent_registry[classification.intent]
return await target_agent.run(user_message)에이전트가 보유한 도구와 능력을 기준으로 라우팅합니다. A2A의 Agent Card가 이 패턴을 지원합니다.
@dataclass
class AgentCapability:
agent_name: str
skills: list[str]
tools: list[str]
max_complexity: int # 1-10
avg_latency_ms: int
class CapabilityRouter:
def __init__(self):
self.registry: list[AgentCapability] = []
def register(self, capability: AgentCapability):
self.registry.append(capability)
async def route(
self, task: str, required_skills: list[str]
) -> str:
"""필요한 스킬을 보유한 에이전트를 선택"""
candidates = []
for cap in self.registry:
skill_match = len(
set(required_skills) & set(cap.skills)
) / len(required_skills)
if skill_match > 0.5: # 50% 이상 스킬 일치
candidates.append((cap, skill_match))
if not candidates:
return "fallback_agent"
# 스킬 일치도와 지연시간을 고려한 최적 선택
candidates.sort(
key=lambda x: (x[1], -x[0].avg_latency_ms),
reverse=True
)
return candidates[0][0].agent_name
# 에이전트 등록
router = CapabilityRouter()
router.register(AgentCapability(
agent_name="sql-analyst",
skills=["sql", "data-analysis", "visualization"],
tools=["postgres_query", "chart_generator"],
max_complexity=8,
avg_latency_ms=3000
))
router.register(AgentCapability(
agent_name="code-expert",
skills=["python", "typescript", "code-review"],
tools=["linter", "test_runner", "git_operations"],
max_complexity=9,
avg_latency_ms=5000
))요청의 의미적 유사도를 기반으로 라우팅합니다. 사전 정의된 카테고리에 맞지 않는 요청도 처리할 수 있습니다.
from sentence_transformers import SentenceTransformer
import numpy as np
class SemanticRouter:
def __init__(self):
self.model = SentenceTransformer("all-MiniLM-L6-v2")
self.routes: list[dict] = []
def add_route(
self,
agent_name: str,
description: str,
examples: list[str]
):
"""라우팅 경로 등록 (설명 + 예시)"""
all_texts = [description] + examples
embeddings = self.model.encode(all_texts)
self.routes.append({
"agent": agent_name,
"embeddings": embeddings,
"centroid": np.mean(embeddings, axis=0)
})
async def route(self, query: str) -> str:
"""쿼리와 가장 유사한 라우트 선택"""
query_embedding = self.model.encode(query)
best_score = -1
best_agent = None
for route in self.routes:
score = np.dot(
query_embedding, route["centroid"]
) / (
np.linalg.norm(query_embedding)
* np.linalg.norm(route["centroid"])
)
if score > best_score:
best_score = score
best_agent = route["agent"]
return best_agent if best_score > 0.6 else "fallback"
# 라우트 등록
semantic_router = SemanticRouter()
semantic_router.add_route(
"billing",
"결제, 환불, 청구서 관련 업무",
["환불 받고 싶어요", "카드 결제가 안 돼요", "영수증 발급해주세요"]
)
semantic_router.add_route(
"shipping",
"배송 추적, 배송 지연, 반품 관련 업무",
["택배가 안 와요", "배송 현황 알려주세요", "반품하고 싶어요"]
)복잡한 요청을 하위 작업으로 분해하는 방식에 따라 시스템의 효율성이 크게 달라집니다.
from pydantic import BaseModel
class SubTask(BaseModel):
id: str
description: str
assigned_agent: str
dependencies: list[str] # 선행 작업 ID
priority: int
class TaskDecomposition(BaseModel):
original_request: str
subtasks: list[SubTask]
execution_plan: str # sequential, parallel, mixed
# 작업 분해 에이전트
decomposer = Agent(
name="task-decomposer",
instructions="""사용자 요청을 분석하여 하위 작업으로 분해합니다.
규칙:
1. 각 하위 작업은 하나의 에이전트가 독립적으로 수행 가능해야 합니다.
2. 의존 관계를 명확히 표시하세요.
3. 병렬 실행 가능한 작업은 dependencies를 빈 리스트로 설정하세요.
4. 하위 작업 수는 2~6개가 적절합니다.
사용 가능한 에이전트:
- researcher: 정보 검색과 조사
- analyst: 데이터 분석
- writer: 문서/보고서 작성
- reviewer: 품질 검증""",
output_type=TaskDecomposition
)
# 사용 예시
decomposition = await decomposer.run(
"Q1 매출 데이터를 분석하고, 경쟁사와 비교한 보고서를 작성해주세요"
)
# 결과:
# subtasks = [
# {id: "1", description: "Q1 매출 데이터 수집", agent: "researcher", deps: []},
# {id: "2", description: "경쟁사 데이터 수집", agent: "researcher", deps: []},
# {id: "3", description: "매출 데이터 분석", agent: "analyst", deps: ["1"]},
# {id: "4", description: "경쟁사 비교 분석", agent: "analyst", deps: ["1", "2"]},
# {id: "5", description: "보고서 작성", agent: "writer", deps: ["3", "4"]},
# ]
# execution_plan: "mixed" (1,2는 병렬 → 3,4 → 5 순차)분해된 작업을 의존성 순서에 따라 효율적으로 실행하는 엔진이 필요합니다.
import asyncio
from collections import defaultdict
class TaskExecutionEngine:
def __init__(self, agent_registry: dict[str, Agent]):
self.agents = agent_registry
self.results: dict[str, any] = {}
async def execute(
self, decomposition: TaskDecomposition
) -> dict[str, any]:
"""의존성 그래프를 따라 작업 실행"""
# 의존성 역방향 맵 구성
dependents = defaultdict(list)
pending_deps = {}
for task in decomposition.subtasks:
pending_deps[task.id] = set(task.dependencies)
for dep in task.dependencies:
dependents[dep].append(task.id)
# 준비된 작업 큐
ready = asyncio.Queue()
for task in decomposition.subtasks:
if not task.dependencies:
await ready.put(task)
completed = 0
total = len(decomposition.subtasks)
async def worker():
nonlocal completed
while completed < total:
task = await ready.get()
# 선행 작업 결과를 컨텍스트로 전달
context = {
dep_id: self.results[dep_id]
for dep_id in task.dependencies
if dep_id in self.results
}
agent = self.agents[task.assigned_agent]
result = await agent.run(
f"{task.description}\n\n이전 단계 결과: {context}"
)
self.results[task.id] = result
completed += 1
# 이 작업에 의존하는 다른 작업 확인
for dependent_id in dependents[task.id]:
pending_deps[dependent_id].discard(task.id)
if not pending_deps[dependent_id]:
dep_task = next(
t for t in decomposition.subtasks
if t.id == dependent_id
)
await ready.put(dep_task)
# 병렬 워커로 실행
workers = [asyncio.create_task(worker()) for _ in range(3)]
await asyncio.gather(*workers)
return self.results작업 분해 시 "적절한 세분화 수준"을 찾는 것이 핵심입니다. 너무 세분화하면 에이전트 간 통신 비용이 증가하고, 너무 뭉치면 단일 에이전트 한계를 반복합니다. 하나의 하위 작업은 하나의 LLM 호출로 완료 가능한 수준이 이상적입니다.
위임된 에이전트가 실패하거나 부적절한 결과를 반환할 때의 대응 전략입니다.
class DelegationWithFallback:
def __init__(self, primary: Agent, fallbacks: list[Agent]):
self.primary = primary
self.fallbacks = fallbacks
async def execute(
self, task: str, max_retries: int = 2
) -> dict:
# 1. 1차 에이전트 시도
try:
result = await self.primary.run(task)
if self.validate_result(result):
return {"agent": self.primary.name, "result": result}
except Exception as e:
log.warning(f"Primary agent failed: {e}")
# 2. 폴백 에이전트 순차 시도
for fallback in self.fallbacks:
try:
result = await fallback.run(task)
if self.validate_result(result):
return {"agent": fallback.name, "result": result}
except Exception:
continue
# 3. 모든 에이전트 실패 시
return {
"agent": "none",
"result": None,
"error": "모든 에이전트가 작업을 처리하지 못했습니다"
}
def validate_result(self, result: any) -> bool:
"""결과의 유효성 검증"""
if result is None:
return False
if isinstance(result, str) and len(result) < 10:
return False
return True장시간 응답이 없는 에이전트에 대한 보호 메커니즘이 필요합니다.
import asyncio
class CircuitBreaker:
def __init__(
self, failure_threshold: int = 3,
recovery_time: float = 60.0
):
self.failure_count = 0
self.failure_threshold = failure_threshold
self.recovery_time = recovery_time
self.last_failure_time = 0
self.state = "closed" # closed, open, half-open
async def call(self, agent: Agent, task: str, timeout: float = 30.0):
if self.state == "open":
if time.time() - self.last_failure_time > self.recovery_time:
self.state = "half-open"
else:
raise CircuitOpenError(
f"Agent {agent.name} circuit is open"
)
try:
result = await asyncio.wait_for(
agent.run(task), timeout=timeout
)
if self.state == "half-open":
self.state = "closed"
self.failure_count = 0
return result
except (asyncio.TimeoutError, Exception) as e:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "open"
raise다음 장에서는 여러 에이전트가 하나의 결론에 도달하기 위해 협업하는 메커니즘을 다룹니다. 투표, 토론, 합의 프로토콜 등 에이전트 간 협업의 고급 패턴을 살펴봅니다.
이 글이 도움이 되셨나요?
투표, 토론, 비판적 검토, 다수결 합의 등 여러 에이전트가 하나의 결론에 도달하기 위한 협업 패턴과 합의 프로토콜을 다룹니다.
A2A(Agent-to-Agent) 프로토콜, MCP(Model Context Protocol), 그리고 커스텀 메시지 패턴까지 에이전트 간 통신의 표준과 구현을 다룹니다.
감독자-워커, 계층적 팀, 피어-투-피어 네트워크 등 멀티에이전트 팀 아키텍처의 설계 원칙과 트레이드오프를 코드 예제와 함께 분석합니다.