에이전트 라이프사이클 관리, 도구 오케스트레이션, 서브에이전트 관리, 상태 관리, 에러 복구 등 복잡한 AI 워크플로우를 조율하는 방법을 다룹니다.
단순한 챗봇은 "질문 - 응답"의 1회성 상호작용으로 충분합니다. 하지만 프로덕션 에이전트는 훨씬 복잡합니다. 파일을 읽고, 코드를 실행하고, 외부 API를 호출하며, 때로는 다른 에이전트에게 작업을 위임합니다. 이 모든 과정을 안정적으로 조율하는 것이 오케스트레이션 하네스의 역할입니다.
에이전트의 실행은 단순한 함수 호출이 아닌, 여러 단계를 거치는 라이프사이클(Lifecycle)을 가집니다.
from enum import Enum
from dataclasses import dataclass, field
from typing import Any
class AgentState(Enum):
INITIALIZED = "initialized"
PLANNING = "planning"
EXECUTING = "executing"
TOOL_CALLING = "tool_calling"
DELEGATING = "delegating"
REFLECTING = "reflecting"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class AgentContext:
"""에이전트 실행 컨텍스트"""
task: str
state: AgentState = AgentState.INITIALIZED
plan: list[str] = field(default_factory=list)
history: list[dict] = field(default_factory=list)
tool_results: list[dict] = field(default_factory=list)
iteration: int = 0
max_iterations: int = 20
metadata: dict = field(default_factory=dict)
class AgentOrchestrator:
"""에이전트 라이프사이클 오케스트레이터"""
def __init__(self, model, tools, config):
self.model = model
self.tools = tools
self.config = config
self.hooks: dict[str, list[callable]] = {}
def on_state_change(self, state: AgentState, hook: callable):
"""상태 전이 훅 등록"""
self.hooks.setdefault(state.value, []).append(hook)
async def _notify_hooks(self, state: AgentState, ctx: AgentContext):
for hook in self.hooks.get(state.value, []):
await hook(ctx)
async def run(self, task: str) -> dict:
ctx = AgentContext(task=task)
while ctx.state not in (AgentState.COMPLETED, AgentState.FAILED):
if ctx.iteration >= ctx.max_iterations:
ctx.state = AgentState.FAILED
ctx.metadata["failure_reason"] = "최대 반복 횟수 초과"
break
ctx.iteration += 1
await self._step(ctx)
await self._notify_hooks(ctx.state, ctx)
return {
"state": ctx.state.value,
"history": ctx.history,
"iterations": ctx.iteration,
"metadata": ctx.metadata,
}
async def _step(self, ctx: AgentContext):
"""한 단계 실행"""
response = await self.model.generate(
system_prompt=self._build_system_prompt(ctx),
messages=self._build_messages(ctx),
tools=self._get_tool_schemas(),
)
if response.has_tool_calls:
ctx.state = AgentState.TOOL_CALLING
results = await self._execute_tools(response.tool_calls)
ctx.tool_results.extend(results)
ctx.state = AgentState.EXECUTING
elif response.indicates_completion:
ctx.state = AgentState.COMPLETED
else:
ctx.state = AgentState.EXECUTING
ctx.history.append({
"iteration": ctx.iteration,
"state": ctx.state.value,
"response": response.content,
})라이프사이클 훅은 모니터링, 로깅, 비용 추적 등을 에이전트 로직과 분리하여 구현할 수 있게 해줍니다. 예를 들어, TOOL_CALLING 상태에 진입할 때마다 도구 사용량을 기록하는 훅을 등록할 수 있습니다.
에이전트의 핵심 능력은 도구를 사용하는 것입니다. 하지만 도구 사용에는 신중한 제어가 필요합니다. 어떤 도구를 사용할 수 있는지, 각 도구의 실행 제한은 무엇인지, 도구 실행이 실패하면 어떻게 할 것인지를 관리해야 합니다.
from dataclasses import dataclass
from typing import Callable, Awaitable, Any
@dataclass
class ToolDefinition:
"""도구 정의"""
name: str
description: str
function: Callable[..., Awaitable[Any]]
schema: dict
requires_confirmation: bool = False
max_calls_per_session: int | None = None
timeout_seconds: float = 30.0
risk_level: str = "low" # low, medium, high
class ToolOrchestrator:
"""도구 실행 관리자"""
def __init__(self):
self.tools: dict[str, ToolDefinition] = {}
self.call_counts: dict[str, int] = {}
def register(self, tool: ToolDefinition):
self.tools[tool.name] = tool
self.call_counts[tool.name] = 0
async def execute(
self,
tool_name: str,
arguments: dict,
) -> dict:
tool = self.tools.get(tool_name)
if not tool:
return {"error": f"알 수 없는 도구: {tool_name}"}
# 호출 제한 확인
if tool.max_calls_per_session is not None:
if self.call_counts[tool_name] >= tool.max_calls_per_session:
return {
"error": (
f"도구 '{tool_name}'의 최대 호출 횟수"
f"({tool.max_calls_per_session})를 초과했습니다"
)
}
# 위험도 높은 도구는 확인 필요
if tool.requires_confirmation:
confirmed = await self._request_confirmation(
tool_name, arguments
)
if not confirmed:
return {"error": "사용자가 도구 실행을 거부했습니다"}
# 타임아웃 적용 실행
try:
import asyncio
result = await asyncio.wait_for(
tool.function(**arguments),
timeout=tool.timeout_seconds,
)
self.call_counts[tool_name] += 1
return {"result": result}
except asyncio.TimeoutError:
return {
"error": f"도구 '{tool_name}' 실행 시간 초과 "
f"({tool.timeout_seconds}초)"
}
except Exception as e:
return {"error": f"도구 실행 실패: {str(e)}"}
# 도구 등록 예시
orchestrator = ToolOrchestrator()
orchestrator.register(ToolDefinition(
name="read_file",
description="파일 내용을 읽습니다",
function=read_file_async,
schema={"path": {"type": "string"}},
risk_level="low",
))
orchestrator.register(ToolDefinition(
name="execute_code",
description="코드를 실행합니다",
function=execute_code_async,
schema={"code": {"type": "string"}, "language": {"type": "string"}},
requires_confirmation=True,
timeout_seconds=60.0,
risk_level="high",
))
orchestrator.register(ToolDefinition(
name="web_search",
description="웹 검색을 수행합니다",
function=web_search_async,
schema={"query": {"type": "string"}},
max_calls_per_session=10,
risk_level="medium",
))코드 실행, 파일 쓰기, 외부 API 호출 같은 부작용이 있는 도구는 반드시 requires_confirmation을 설정하거나 샌드박스 환경에서 실행해야 합니다. 에이전트가 의도치 않게 시스템에 해를 끼치는 것을 방지하는 것은 오케스트레이션 하네스의 핵심 책임입니다.
복잡한 작업은 하나의 에이전트가 모두 처리하기보다, 전문화된 서브에이전트에게 위임하는 것이 효과적입니다. 이를 멀티에이전트 오케스트레이션(Multi-Agent Orchestration)이라 합니다.
@dataclass
class SubAgentConfig:
"""서브에이전트 설정"""
name: str
system_prompt: str
model: str
tools: list[str]
max_iterations: int = 10
timeout_seconds: float = 120.0
class SubAgentManager:
"""서브에이전트 생성 및 관리"""
def __init__(self, model_factory, tool_registry):
self.model_factory = model_factory
self.tool_registry = tool_registry
self.agents: dict[str, SubAgentConfig] = {}
def register(self, config: SubAgentConfig):
self.agents[config.name] = config
async def delegate(
self,
agent_name: str,
task: str,
context: dict | None = None,
) -> dict:
config = self.agents.get(agent_name)
if not config:
raise ValueError(f"등록되지 않은 에이전트: {agent_name}")
model = self.model_factory.create(config.model)
tools = [
self.tool_registry.get(t) for t in config.tools
]
orchestrator = AgentOrchestrator(
model=model,
tools=tools,
config=config,
)
import asyncio
try:
result = await asyncio.wait_for(
orchestrator.run(task),
timeout=config.timeout_seconds,
)
return {
"agent": agent_name,
"status": "success",
"result": result,
}
except asyncio.TimeoutError:
return {
"agent": agent_name,
"status": "timeout",
"error": f"{config.timeout_seconds}초 초과",
}
# 서브에이전트 등록
manager = SubAgentManager(model_factory, tool_registry)
manager.register(SubAgentConfig(
name="code_analyzer",
system_prompt="코드를 분석하고 개선점을 제안하는 전문가입니다.",
model="claude-sonnet-4-20250514",
tools=["read_file", "search_code"],
max_iterations=5,
))
manager.register(SubAgentConfig(
name="test_writer",
system_prompt="테스트 코드를 작성하는 전문가입니다.",
model="claude-sonnet-4-20250514",
tools=["read_file", "write_file", "execute_code"],
max_iterations=10,
))장시간 실행되는 에이전트 작업은 중간에 실패할 수 있습니다. 체크포인팅(Checkpointing)은 에이전트의 상태를 주기적으로 저장하여, 실패 시 처음부터 다시 시작하지 않고 마지막 체크포인트에서 재개할 수 있게 합니다.
import json
import time
from pathlib import Path
from dataclasses import asdict
class CheckpointManager:
"""에이전트 상태 체크포인팅"""
def __init__(self, checkpoint_dir: str = ".checkpoints"):
self.checkpoint_dir = Path(checkpoint_dir)
self.checkpoint_dir.mkdir(exist_ok=True)
def save(self, session_id: str, context: AgentContext):
"""현재 상태 저장"""
checkpoint = {
"session_id": session_id,
"timestamp": time.time(),
"state": context.state.value,
"iteration": context.iteration,
"task": context.task,
"plan": context.plan,
"history": context.history,
"tool_results": context.tool_results,
}
path = self.checkpoint_dir / f"{session_id}.json"
path.write_text(
json.dumps(checkpoint, ensure_ascii=False, indent=2)
)
def load(self, session_id: str) -> AgentContext | None:
"""저장된 상태 복원"""
path = self.checkpoint_dir / f"{session_id}.json"
if not path.exists():
return None
data = json.loads(path.read_text())
ctx = AgentContext(task=data["task"])
ctx.state = AgentState(data["state"])
ctx.iteration = data["iteration"]
ctx.plan = data["plan"]
ctx.history = data["history"]
ctx.tool_results = data["tool_results"]
return ctx
def cleanup(self, session_id: str):
"""완료된 세션의 체크포인트 삭제"""
path = self.checkpoint_dir / f"{session_id}.json"
if path.exists():
path.unlink()에이전트 실행 중 발생하는 에러에 대한 복구 전략은 에러의 종류에 따라 달라집니다.
from enum import Enum
class ErrorType(Enum):
TRANSIENT = "transient" # 일시적 (네트워크, 레이트 리밋)
TOOL_FAILURE = "tool_failure" # 도구 실행 실패
MODEL_ERROR = "model_error" # 모델 응답 오류 (파싱 실패 등)
LOGIC_ERROR = "logic_error" # 에이전트 루프/탈선
FATAL = "fatal" # 복구 불가
class ErrorRecovery:
"""에러 유형별 복구 전략"""
@staticmethod
async def recover(
error_type: ErrorType,
error: Exception,
context: AgentContext,
orchestrator: AgentOrchestrator,
) -> bool:
"""에러 복구 시도. 성공 시 True 반환."""
if error_type == ErrorType.TRANSIENT:
# 지수 백오프 재시도
return await retry_with_backoff(
orchestrator._step, context,
max_retries=3,
)
elif error_type == ErrorType.TOOL_FAILURE:
# 대체 도구로 전환하거나 도구 없이 진행
context.history.append({
"type": "error_recovery",
"message": f"도구 실패: {error}. 대체 방법을 시도합니다.",
})
return True # 다음 반복에서 모델이 대안을 찾도록
elif error_type == ErrorType.MODEL_ERROR:
# 프롬프트를 단순화하여 재시도
context.metadata["simplified_prompt"] = True
return True
elif error_type == ErrorType.LOGIC_ERROR:
# 계획 단계로 되돌아가기
context.state = AgentState.PLANNING
context.history.append({
"type": "error_recovery",
"message": "논리 에러 감지. 계획을 재수립합니다.",
})
return True
elif error_type == ErrorType.FATAL:
return False
return False에러 복구에서 가장 위험한 패턴은 무한 재시도입니다. 반드시 최대 재시도 횟수와 전체 실행 시간 제한을 설정하세요. 에이전트가 같은 에러를 반복적으로 만나고 있다면, 재시도보다는 계획을 재수립하는 것이 효과적입니다.
오케스트레이션의 상당 부분은 시스템 프롬프트를 통해 이루어집니다. 잘 설계된 시스템 프롬프트는 에이전트의 행동을 코드 없이도 효과적으로 제어합니다.
def build_orchestration_prompt(
context: AgentContext,
available_tools: list[str],
) -> str:
"""동적 시스템 프롬프트 구성"""
prompt_parts = [
"# 역할",
"당신은 소프트웨어 엔지니어링 작업을 수행하는 AI 에이전트입니다.",
"",
"# 작업 규칙",
"1. 작업을 시작하기 전에 반드시 관련 파일을 먼저 읽으세요.",
"2. 코드를 수정하기 전에 기존 코드의 스타일과 패턴을 파악하세요.",
"3. 한 번에 하나의 파일만 수정하세요.",
"4. 수정 후 반드시 테스트를 실행하세요.",
"5. 확신이 없는 경우 사용자에게 확인을 요청하세요.",
"",
"# 사용 가능한 도구",
]
for tool in available_tools:
prompt_parts.append(f"- {tool}")
# 현재 진행 상황 주입
if context.iteration > 0:
prompt_parts.extend([
"",
f"# 현재 진행 상황 (반복 {context.iteration}/{context.max_iterations})",
f"상태: {context.state.value}",
])
if context.plan:
prompt_parts.append("계획:")
for i, step in enumerate(context.plan, 1):
prompt_parts.append(f" {i}. {step}")
return "\n".join(prompt_parts)8장에서는 하네스를 포함한 AI 시스템을 프로덕션에 안전하게 배포하는 배포 하네스를 다룹니다. 카나리 배포, 섀도우 테스팅, A/B 테스트, 롤백 전략 등을 살펴봅니다.
이 글이 도움이 되셨나요?
관련 주제 더 보기
카나리 배포, 섀도우 테스팅, A/B 테스트, 블루-그린 배포, 롤백 전략 등 AI 시스템을 프로덕션에 안전하게 배포하는 전략을 다룹니다.
프롬프트 인젝션 방어, 유해 콘텐츠 필터링, Guardrails AI와 NeMo Guardrails 프레임워크, 다계층 방어 전략을 통해 AI 시스템의 안전을 보장하는 방법을 다룹니다.
토큰 사용량, 지연시간, 비용 추적, 드리프트 감지, 품질 모니터링, 알림 설계, 피드백 루프 등 AI 시스템의 관측 가능성 파이프라인을 다룹니다.