본문으로 건너뛰기
Kreath Archive
TechProjectsBooksAbout
TechProjectsBooksAbout

내비게이션

  • Tech
  • Projects
  • Books
  • About
  • Tags

카테고리

  • AI / ML
  • 웹 개발
  • 프로그래밍
  • 개발 도구

연결

  • GitHub
  • Email
  • RSS
© 2026 Kreath Archive. All rights reserved.Built with Next.js + MDX
홈TechProjectsBooksAbout
//
  1. 홈
  2. 테크
  3. 7장: 오케스트레이션 하네스 — 워크플로우 제어
2026년 3월 21일·AI / ML·

7장: 오케스트레이션 하네스 — 워크플로우 제어

에이전트 라이프사이클 관리, 도구 오케스트레이션, 서브에이전트 관리, 상태 관리, 에러 복구 등 복잡한 AI 워크플로우를 조율하는 방법을 다룹니다.

16분1,129자9개 섹션
aitestingevaluationmlops
공유
harness-engineering7 / 10
12345678910
이전6장: 가드레일 하네스 — 안전 장치 설계와 구현다음8장: 배포 하네스 — 안전한 모델 릴리즈

단순한 챗봇은 "질문 - 응답"의 1회성 상호작용으로 충분합니다. 하지만 프로덕션 에이전트는 훨씬 복잡합니다. 파일을 읽고, 코드를 실행하고, 외부 API를 호출하며, 때로는 다른 에이전트에게 작업을 위임합니다. 이 모든 과정을 안정적으로 조율하는 것이 오케스트레이션 하네스의 역할입니다.

이 장에서 다루는 내용

  • 에이전트 라이프사이클(Lifecycle) 관리
  • 도구 오케스트레이션과 실행 제어
  • 서브에이전트 관리와 위임 패턴
  • 상태 관리와 체크포인팅
  • 에러 복구 전략
  • 시스템 프롬프트 기반 조율

에이전트 라이프사이클

에이전트의 실행은 단순한 함수 호출이 아닌, 여러 단계를 거치는 라이프사이클(Lifecycle)을 가집니다.

agent_lifecycle.py
python
from enum import Enum
from dataclasses import dataclass, field
from typing import Any
 
 
class AgentState(Enum):
    INITIALIZED = "initialized"
    PLANNING = "planning"
    EXECUTING = "executing"
    TOOL_CALLING = "tool_calling"
    DELEGATING = "delegating"
    REFLECTING = "reflecting"
    COMPLETED = "completed"
    FAILED = "failed"
 
 
@dataclass
class AgentContext:
    """에이전트 실행 컨텍스트"""
    task: str
    state: AgentState = AgentState.INITIALIZED
    plan: list[str] = field(default_factory=list)
    history: list[dict] = field(default_factory=list)
    tool_results: list[dict] = field(default_factory=list)
    iteration: int = 0
    max_iterations: int = 20
    metadata: dict = field(default_factory=dict)
 
 
class AgentOrchestrator:
    """에이전트 라이프사이클 오케스트레이터"""
 
    def __init__(self, model, tools, config):
        self.model = model
        self.tools = tools
        self.config = config
        self.hooks: dict[str, list[callable]] = {}
 
    def on_state_change(self, state: AgentState, hook: callable):
        """상태 전이 훅 등록"""
        self.hooks.setdefault(state.value, []).append(hook)
 
    async def _notify_hooks(self, state: AgentState, ctx: AgentContext):
        for hook in self.hooks.get(state.value, []):
            await hook(ctx)
 
    async def run(self, task: str) -> dict:
        ctx = AgentContext(task=task)
 
        while ctx.state not in (AgentState.COMPLETED, AgentState.FAILED):
            if ctx.iteration >= ctx.max_iterations:
                ctx.state = AgentState.FAILED
                ctx.metadata["failure_reason"] = "최대 반복 횟수 초과"
                break
 
            ctx.iteration += 1
            await self._step(ctx)
            await self._notify_hooks(ctx.state, ctx)
 
        return {
            "state": ctx.state.value,
            "history": ctx.history,
            "iterations": ctx.iteration,
            "metadata": ctx.metadata,
        }
 
    async def _step(self, ctx: AgentContext):
        """한 단계 실행"""
        response = await self.model.generate(
            system_prompt=self._build_system_prompt(ctx),
            messages=self._build_messages(ctx),
            tools=self._get_tool_schemas(),
        )
 
        if response.has_tool_calls:
            ctx.state = AgentState.TOOL_CALLING
            results = await self._execute_tools(response.tool_calls)
            ctx.tool_results.extend(results)
            ctx.state = AgentState.EXECUTING
        elif response.indicates_completion:
            ctx.state = AgentState.COMPLETED
        else:
            ctx.state = AgentState.EXECUTING
 
        ctx.history.append({
            "iteration": ctx.iteration,
            "state": ctx.state.value,
            "response": response.content,
        })
Info

라이프사이클 훅은 모니터링, 로깅, 비용 추적 등을 에이전트 로직과 분리하여 구현할 수 있게 해줍니다. 예를 들어, TOOL_CALLING 상태에 진입할 때마다 도구 사용량을 기록하는 훅을 등록할 수 있습니다.

도구 오케스트레이션

에이전트의 핵심 능력은 도구를 사용하는 것입니다. 하지만 도구 사용에는 신중한 제어가 필요합니다. 어떤 도구를 사용할 수 있는지, 각 도구의 실행 제한은 무엇인지, 도구 실행이 실패하면 어떻게 할 것인지를 관리해야 합니다.

tool_orchestrator.py
python
from dataclasses import dataclass
from typing import Callable, Awaitable, Any
 
 
@dataclass
class ToolDefinition:
    """도구 정의"""
    name: str
    description: str
    function: Callable[..., Awaitable[Any]]
    schema: dict
    requires_confirmation: bool = False
    max_calls_per_session: int | None = None
    timeout_seconds: float = 30.0
    risk_level: str = "low"  # low, medium, high
 
 
class ToolOrchestrator:
    """도구 실행 관리자"""
 
    def __init__(self):
        self.tools: dict[str, ToolDefinition] = {}
        self.call_counts: dict[str, int] = {}
 
    def register(self, tool: ToolDefinition):
        self.tools[tool.name] = tool
        self.call_counts[tool.name] = 0
 
    async def execute(
        self,
        tool_name: str,
        arguments: dict,
    ) -> dict:
        tool = self.tools.get(tool_name)
        if not tool:
            return {"error": f"알 수 없는 도구: {tool_name}"}
 
        # 호출 제한 확인
        if tool.max_calls_per_session is not None:
            if self.call_counts[tool_name] >= tool.max_calls_per_session:
                return {
                    "error": (
                        f"도구 '{tool_name}'의 최대 호출 횟수"
                        f"({tool.max_calls_per_session})를 초과했습니다"
                    )
                }
 
        # 위험도 높은 도구는 확인 필요
        if tool.requires_confirmation:
            confirmed = await self._request_confirmation(
                tool_name, arguments
            )
            if not confirmed:
                return {"error": "사용자가 도구 실행을 거부했습니다"}
 
        # 타임아웃 적용 실행
        try:
            import asyncio
            result = await asyncio.wait_for(
                tool.function(**arguments),
                timeout=tool.timeout_seconds,
            )
            self.call_counts[tool_name] += 1
            return {"result": result}
        except asyncio.TimeoutError:
            return {
                "error": f"도구 '{tool_name}' 실행 시간 초과 "
                        f"({tool.timeout_seconds}초)"
            }
        except Exception as e:
            return {"error": f"도구 실행 실패: {str(e)}"}
 
 
# 도구 등록 예시
orchestrator = ToolOrchestrator()
 
orchestrator.register(ToolDefinition(
    name="read_file",
    description="파일 내용을 읽습니다",
    function=read_file_async,
    schema={"path": {"type": "string"}},
    risk_level="low",
))
 
orchestrator.register(ToolDefinition(
    name="execute_code",
    description="코드를 실행합니다",
    function=execute_code_async,
    schema={"code": {"type": "string"}, "language": {"type": "string"}},
    requires_confirmation=True,
    timeout_seconds=60.0,
    risk_level="high",
))
 
orchestrator.register(ToolDefinition(
    name="web_search",
    description="웹 검색을 수행합니다",
    function=web_search_async,
    schema={"query": {"type": "string"}},
    max_calls_per_session=10,
    risk_level="medium",
))
Warning

코드 실행, 파일 쓰기, 외부 API 호출 같은 부작용이 있는 도구는 반드시 requires_confirmation을 설정하거나 샌드박스 환경에서 실행해야 합니다. 에이전트가 의도치 않게 시스템에 해를 끼치는 것을 방지하는 것은 오케스트레이션 하네스의 핵심 책임입니다.

서브에이전트 관리

복잡한 작업은 하나의 에이전트가 모두 처리하기보다, 전문화된 서브에이전트에게 위임하는 것이 효과적입니다. 이를 멀티에이전트 오케스트레이션(Multi-Agent Orchestration)이라 합니다.

sub_agent_manager.py
python
@dataclass
class SubAgentConfig:
    """서브에이전트 설정"""
    name: str
    system_prompt: str
    model: str
    tools: list[str]
    max_iterations: int = 10
    timeout_seconds: float = 120.0
 
 
class SubAgentManager:
    """서브에이전트 생성 및 관리"""
 
    def __init__(self, model_factory, tool_registry):
        self.model_factory = model_factory
        self.tool_registry = tool_registry
        self.agents: dict[str, SubAgentConfig] = {}
 
    def register(self, config: SubAgentConfig):
        self.agents[config.name] = config
 
    async def delegate(
        self,
        agent_name: str,
        task: str,
        context: dict | None = None,
    ) -> dict:
        config = self.agents.get(agent_name)
        if not config:
            raise ValueError(f"등록되지 않은 에이전트: {agent_name}")
 
        model = self.model_factory.create(config.model)
        tools = [
            self.tool_registry.get(t) for t in config.tools
        ]
 
        orchestrator = AgentOrchestrator(
            model=model,
            tools=tools,
            config=config,
        )
 
        import asyncio
        try:
            result = await asyncio.wait_for(
                orchestrator.run(task),
                timeout=config.timeout_seconds,
            )
            return {
                "agent": agent_name,
                "status": "success",
                "result": result,
            }
        except asyncio.TimeoutError:
            return {
                "agent": agent_name,
                "status": "timeout",
                "error": f"{config.timeout_seconds}초 초과",
            }
 
 
# 서브에이전트 등록
manager = SubAgentManager(model_factory, tool_registry)
 
manager.register(SubAgentConfig(
    name="code_analyzer",
    system_prompt="코드를 분석하고 개선점을 제안하는 전문가입니다.",
    model="claude-sonnet-4-20250514",
    tools=["read_file", "search_code"],
    max_iterations=5,
))
 
manager.register(SubAgentConfig(
    name="test_writer",
    system_prompt="테스트 코드를 작성하는 전문가입니다.",
    model="claude-sonnet-4-20250514",
    tools=["read_file", "write_file", "execute_code"],
    max_iterations=10,
))

상태 관리와 체크포인팅

장시간 실행되는 에이전트 작업은 중간에 실패할 수 있습니다. 체크포인팅(Checkpointing)은 에이전트의 상태를 주기적으로 저장하여, 실패 시 처음부터 다시 시작하지 않고 마지막 체크포인트에서 재개할 수 있게 합니다.

checkpointing.py
python
import json
import time
from pathlib import Path
from dataclasses import asdict
 
 
class CheckpointManager:
    """에이전트 상태 체크포인팅"""
 
    def __init__(self, checkpoint_dir: str = ".checkpoints"):
        self.checkpoint_dir = Path(checkpoint_dir)
        self.checkpoint_dir.mkdir(exist_ok=True)
 
    def save(self, session_id: str, context: AgentContext):
        """현재 상태 저장"""
        checkpoint = {
            "session_id": session_id,
            "timestamp": time.time(),
            "state": context.state.value,
            "iteration": context.iteration,
            "task": context.task,
            "plan": context.plan,
            "history": context.history,
            "tool_results": context.tool_results,
        }
 
        path = self.checkpoint_dir / f"{session_id}.json"
        path.write_text(
            json.dumps(checkpoint, ensure_ascii=False, indent=2)
        )
 
    def load(self, session_id: str) -> AgentContext | None:
        """저장된 상태 복원"""
        path = self.checkpoint_dir / f"{session_id}.json"
        if not path.exists():
            return None
 
        data = json.loads(path.read_text())
        ctx = AgentContext(task=data["task"])
        ctx.state = AgentState(data["state"])
        ctx.iteration = data["iteration"]
        ctx.plan = data["plan"]
        ctx.history = data["history"]
        ctx.tool_results = data["tool_results"]
        return ctx
 
    def cleanup(self, session_id: str):
        """완료된 세션의 체크포인트 삭제"""
        path = self.checkpoint_dir / f"{session_id}.json"
        if path.exists():
            path.unlink()

에러 복구 전략

에이전트 실행 중 발생하는 에러에 대한 복구 전략은 에러의 종류에 따라 달라집니다.

error_recovery.py
python
from enum import Enum
 
 
class ErrorType(Enum):
    TRANSIENT = "transient"      # 일시적 (네트워크, 레이트 리밋)
    TOOL_FAILURE = "tool_failure" # 도구 실행 실패
    MODEL_ERROR = "model_error"   # 모델 응답 오류 (파싱 실패 등)
    LOGIC_ERROR = "logic_error"   # 에이전트 루프/탈선
    FATAL = "fatal"               # 복구 불가
 
 
class ErrorRecovery:
    """에러 유형별 복구 전략"""
 
    @staticmethod
    async def recover(
        error_type: ErrorType,
        error: Exception,
        context: AgentContext,
        orchestrator: AgentOrchestrator,
    ) -> bool:
        """에러 복구 시도. 성공 시 True 반환."""
 
        if error_type == ErrorType.TRANSIENT:
            # 지수 백오프 재시도
            return await retry_with_backoff(
                orchestrator._step, context,
                max_retries=3,
            )
 
        elif error_type == ErrorType.TOOL_FAILURE:
            # 대체 도구로 전환하거나 도구 없이 진행
            context.history.append({
                "type": "error_recovery",
                "message": f"도구 실패: {error}. 대체 방법을 시도합니다.",
            })
            return True  # 다음 반복에서 모델이 대안을 찾도록
 
        elif error_type == ErrorType.MODEL_ERROR:
            # 프롬프트를 단순화하여 재시도
            context.metadata["simplified_prompt"] = True
            return True
 
        elif error_type == ErrorType.LOGIC_ERROR:
            # 계획 단계로 되돌아가기
            context.state = AgentState.PLANNING
            context.history.append({
                "type": "error_recovery",
                "message": "논리 에러 감지. 계획을 재수립합니다.",
            })
            return True
 
        elif error_type == ErrorType.FATAL:
            return False
 
        return False
Tip

에러 복구에서 가장 위험한 패턴은 무한 재시도입니다. 반드시 최대 재시도 횟수와 전체 실행 시간 제한을 설정하세요. 에이전트가 같은 에러를 반복적으로 만나고 있다면, 재시도보다는 계획을 재수립하는 것이 효과적입니다.

시스템 프롬프트 기반 조율

오케스트레이션의 상당 부분은 시스템 프롬프트를 통해 이루어집니다. 잘 설계된 시스템 프롬프트는 에이전트의 행동을 코드 없이도 효과적으로 제어합니다.

system_prompt_orchestration.py
python
def build_orchestration_prompt(
    context: AgentContext,
    available_tools: list[str],
) -> str:
    """동적 시스템 프롬프트 구성"""
 
    prompt_parts = [
        "# 역할",
        "당신은 소프트웨어 엔지니어링 작업을 수행하는 AI 에이전트입니다.",
        "",
        "# 작업 규칙",
        "1. 작업을 시작하기 전에 반드시 관련 파일을 먼저 읽으세요.",
        "2. 코드를 수정하기 전에 기존 코드의 스타일과 패턴을 파악하세요.",
        "3. 한 번에 하나의 파일만 수정하세요.",
        "4. 수정 후 반드시 테스트를 실행하세요.",
        "5. 확신이 없는 경우 사용자에게 확인을 요청하세요.",
        "",
        "# 사용 가능한 도구",
    ]
 
    for tool in available_tools:
        prompt_parts.append(f"- {tool}")
 
    # 현재 진행 상황 주입
    if context.iteration > 0:
        prompt_parts.extend([
            "",
            f"# 현재 진행 상황 (반복 {context.iteration}/{context.max_iterations})",
            f"상태: {context.state.value}",
        ])
 
        if context.plan:
            prompt_parts.append("계획:")
            for i, step in enumerate(context.plan, 1):
                prompt_parts.append(f"  {i}. {step}")
 
    return "\n".join(prompt_parts)

핵심 요약

  • 에이전트 라이프사이클: 초기화, 계획, 실행, 도구 호출, 위임, 반성, 완료/실패의 상태 전이를 관리합니다.
  • 도구 오케스트레이션: 호출 제한, 타임아웃, 위험도 기반 확인, 실행 환경 격리를 적용합니다.
  • 서브에이전트 관리: 전문화된 에이전트에게 작업을 위임하고, 타임아웃과 결과 수집을 관리합니다.
  • 체크포인팅: 장시간 작업의 상태를 주기적으로 저장하여 실패 시 재개를 가능하게 합니다.
  • 에러 복구: 에러 유형(일시적, 도구, 모델, 논리, 치명적)에 따라 차별화된 복구 전략을 적용합니다.
  • 시스템 프롬프트 조율: 에이전트 상태에 따라 동적으로 프롬프트를 구성하여 행동을 제어합니다.

다음 장 예고

8장에서는 하네스를 포함한 AI 시스템을 프로덕션에 안전하게 배포하는 배포 하네스를 다룹니다. 카나리 배포, 섀도우 테스팅, A/B 테스트, 롤백 전략 등을 살펴봅니다.

이 글이 도움이 되셨나요?

관련 주제 더 보기

#ai#testing#evaluation#mlops

관련 글

AI / ML

8장: 배포 하네스 — 안전한 모델 릴리즈

카나리 배포, 섀도우 테스팅, A/B 테스트, 블루-그린 배포, 롤백 전략 등 AI 시스템을 프로덕션에 안전하게 배포하는 전략을 다룹니다.

2026년 3월 23일·17분
AI / ML

6장: 가드레일 하네스 — 안전 장치 설계와 구현

프롬프트 인젝션 방어, 유해 콘텐츠 필터링, Guardrails AI와 NeMo Guardrails 프레임워크, 다계층 방어 전략을 통해 AI 시스템의 안전을 보장하는 방법을 다룹니다.

2026년 3월 19일·17분
AI / ML

9장: 모니터링 하네스 — 프로덕션 관측과 피드백 루프

토큰 사용량, 지연시간, 비용 추적, 드리프트 감지, 품질 모니터링, 알림 설계, 피드백 루프 등 AI 시스템의 관측 가능성 파이프라인을 다룹니다.

2026년 3월 25일·19분
이전 글6장: 가드레일 하네스 — 안전 장치 설계와 구현
다음 글8장: 배포 하네스 — 안전한 모델 릴리즈

댓글

목차

약 16분 남음
  • 이 장에서 다루는 내용
  • 에이전트 라이프사이클
  • 도구 오케스트레이션
  • 서브에이전트 관리
  • 상태 관리와 체크포인팅
  • 에러 복구 전략
  • 시스템 프롬프트 기반 조율
  • 핵심 요약
  • 다음 장 예고