본문으로 건너뛰기
Kreath Archive
TechProjectsBooksAbout
TechProjectsBooksAbout
© 2026 Kreath
홈TechProjectsBooksAbout
//
  1. 홈
  2. 테크
  3. 7장: 상태 관리와 장애 복구
2026년 4월 2일·AI / ML·

7장: 상태 관리와 장애 복구

멀티에이전트 시스템의 체크포인팅, 이벤트 소싱, 재시도 전략, 그리고 분산 환경에서의 일관성 보장과 장애 복구 패턴을 다룹니다.

14분1,413자6개 섹션
ai-에이전트상태-관리장애-복구체크포인팅
공유
agent-orchestration7 / 11
1234567891011
이전6장: 에이전트 컨트롤 플레인 설계다음8장: 에이전트 플릿 관리와 스케일링

상태 관리가 어려운 이유

멀티에이전트 시스템의 상태 관리는 일반적인 분산 시스템보다 더 복잡합니다. 에이전트의 "상태"에는 대화 이력, 도구 호출 결과, 중간 추론 과정, 다른 에이전트와의 통신 기록이 모두 포함되기 때문입니다. 이 상태가 유실되면 에이전트는 맥락을 잃고, 전체 작업을 처음부터 다시 시작해야 합니다.

상태 관리의 핵심 과제는 세 가지입니다.

일관성(Consistency): 여러 에이전트가 공유하는 상태가 항상 동기화되어야 합니다. 에이전트 A가 데이터를 수정했는데 에이전트 B가 이전 버전을 읽으면 결과가 틀어집니다.

내구성(Durability): 에이전트나 시스템이 크래시되어도 진행 상태가 보존되어야 합니다. 10단계 중 8단계까지 완료된 작업을 1단계부터 다시 하는 것은 낭비입니다.

가시성(Visibility): 현재 전체 작업의 진행 상태를 실시간으로 파악할 수 있어야 합니다. "지금 어떤 에이전트가 무슨 단계에 있는가"를 알 수 없으면 디버깅도 모니터링도 불가능합니다.

체크포인팅(Checkpointing)

LangGraph는 체크포인팅을 핵심 기능으로 제공합니다. 그래프의 각 노드(에이전트) 실행 후 전체 상태를 저장하여, 중단 시점부터 재개할 수 있습니다.

LangGraph 체크포인팅

python
from langgraph.graph import StateGraph
from langgraph.checkpoint.postgres import PostgresSaver
 
# PostgreSQL 기반 체크포인터
checkpointer = PostgresSaver.from_conn_string(
    "postgresql://user:pass@localhost:5432/agents"
)
 
# 상태 정의
class WorkflowState(TypedDict):
    messages: list[dict]
    current_step: str
    completed_steps: list[str]
    intermediate_results: dict
    retry_count: int
 
# 그래프 정의
workflow = StateGraph(WorkflowState)
workflow.add_node("research", research_agent)
workflow.add_node("analyze", analysis_agent)
workflow.add_node("write", writing_agent)
workflow.add_node("review", review_agent)
 
workflow.add_edge("research", "analyze")
workflow.add_edge("analyze", "write")
workflow.add_conditional_edges(
    "review",
    lambda state: "write" if state["needs_revision"] else "__end__"
)
 
# 체크포인터 연결
app = workflow.compile(checkpointer=checkpointer)
 
# 실행 - 각 노드 후 자동 체크포인트
config = {"configurable": {"thread_id": "task-001"}}
result = await app.ainvoke(initial_state, config)
 
# 크래시 후 재개 - 마지막 체크포인트에서 이어서 실행
result = await app.ainvoke(None, config)

커스텀 체크포인팅

프레임워크에 의존하지 않는 범용 체크포인팅 시스템입니다.

python
import json
from datetime import datetime
 
class CheckpointManager:
    def __init__(self, storage):
        self.storage = storage
 
    async def save(
        self,
        workflow_id: str,
        step_id: str,
        state: dict,
        metadata: dict | None = None
    ):
        """체크포인트 저장"""
        checkpoint = {
            "workflow_id": workflow_id,
            "step_id": step_id,
            "state": state,
            "metadata": metadata or {},
            "timestamp": datetime.now().isoformat(),
            "version": await self._next_version(workflow_id)
        }
        await self.storage.save(
            f"checkpoints/{workflow_id}/{checkpoint['version']}",
            checkpoint
        )
 
    async def restore(
        self, workflow_id: str,
        version: int | None = None
    ) -> dict | None:
        """체크포인트 복원 (버전 미지정 시 최신)"""
        if version is None:
            version = await self._latest_version(workflow_id)
        if version is None:
            return None
        return await self.storage.load(
            f"checkpoints/{workflow_id}/{version}"
        )
 
    async def list_checkpoints(
        self, workflow_id: str
    ) -> list[dict]:
        """워크플로우의 체크포인트 이력 조회"""
        return await self.storage.list(
            f"checkpoints/{workflow_id}/"
        )
 
    async def cleanup(
        self, workflow_id: str, keep_last: int = 5
    ):
        """오래된 체크포인트 정리"""
        all_checkpoints = await self.list_checkpoints(workflow_id)
        if len(all_checkpoints) > keep_last:
            to_delete = all_checkpoints[:-keep_last]
            for cp in to_delete:
                await self.storage.delete(cp["key"])
 
# 사용 예시
checkpoint_mgr = CheckpointManager(storage)
 
class CheckpointedWorkflow:
    def __init__(self, workflow_id: str, agents: list):
        self.workflow_id = workflow_id
        self.agents = agents
 
    async def execute(self, initial_state: dict) -> dict:
        # 기존 체크포인트 확인
        checkpoint = await checkpoint_mgr.restore(
            self.workflow_id
        )
 
        if checkpoint:
            state = checkpoint["state"]
            start_step = checkpoint["step_id"]
            print(f"Resuming from step: {start_step}")
        else:
            state = initial_state
            start_step = None
 
        # 에이전트 순차 실행 (체크포인트 이후부터)
        started = start_step is None
        for agent in self.agents:
            if not started:
                if agent.name == start_step:
                    started = True
                continue
 
            result = await agent.run(state)
            state = {**state, **result}
 
            # 체크포인트 저장
            await checkpoint_mgr.save(
                self.workflow_id,
                agent.name,
                state,
                {"agent": agent.name, "status": "completed"}
            )
 
        return state
Info

체크포인팅의 빈도는 트레이드오프입니다. 모든 LLM 호출마다 저장하면 안전하지만 스토리지 비용과 지연시간이 증가합니다. 일반적으로 에이전트 단위(노드 단위)로 체크포인팅하는 것이 적절합니다.

이벤트 소싱(Event Sourcing)

체크포인팅이 "현재 상태의 스냅샷"을 저장한다면, 이벤트 소싱은 "상태를 만들어낸 모든 이벤트"를 저장합니다. 디버깅과 감사에 강력하지만 구현 복잡도가 높습니다.

이벤트 저장소

python
from dataclasses import dataclass, field
from enum import Enum
 
class EventType(str, Enum):
    WORKFLOW_STARTED = "workflow.started"
    AGENT_INVOKED = "agent.invoked"
    AGENT_COMPLETED = "agent.completed"
    AGENT_FAILED = "agent.failed"
    TOOL_CALLED = "tool.called"
    TOOL_RETURNED = "tool.returned"
    HANDOFF_INITIATED = "handoff.initiated"
    HANDOFF_COMPLETED = "handoff.completed"
    STATE_UPDATED = "state.updated"
 
@dataclass
class AgentEvent:
    event_id: str
    workflow_id: str
    event_type: EventType
    agent_id: str | None
    timestamp: datetime
    data: dict
    metadata: dict = field(default_factory=dict)
 
class EventStore:
    def __init__(self, storage):
        self.storage = storage
 
    async def append(self, event: AgentEvent):
        """이벤트 추가 (불변, append-only)"""
        await self.storage.append(
            f"events/{event.workflow_id}",
            event
        )
 
    async def get_events(
        self,
        workflow_id: str,
        after: datetime | None = None,
        event_types: list[EventType] | None = None
    ) -> list[AgentEvent]:
        """이벤트 조회"""
        events = await self.storage.load_all(
            f"events/{workflow_id}"
        )
        if after:
            events = [e for e in events if e.timestamp > after]
        if event_types:
            events = [e for e in events if e.event_type in event_types]
        return events
 
    async def rebuild_state(self, workflow_id: str) -> dict:
        """이벤트 리플레이로 현재 상태 재구성"""
        events = await self.get_events(workflow_id)
        state = {}
 
        for event in events:
            if event.event_type == EventType.WORKFLOW_STARTED:
                state = event.data.get("initial_state", {})
            elif event.event_type == EventType.STATE_UPDATED:
                state.update(event.data.get("updates", {}))
            elif event.event_type == EventType.AGENT_COMPLETED:
                state["last_completed_agent"] = event.agent_id
                state.update(event.data.get("result", {}))
 
        return state
 
# 이벤트 소싱 기반 워크플로우
class EventSourcedWorkflow:
    def __init__(
        self,
        workflow_id: str,
        event_store: EventStore,
        agents: list
    ):
        self.workflow_id = workflow_id
        self.events = event_store
        self.agents = agents
 
    async def execute(self, initial_state: dict) -> dict:
        await self.events.append(AgentEvent(
            event_id=generate_id(),
            workflow_id=self.workflow_id,
            event_type=EventType.WORKFLOW_STARTED,
            agent_id=None,
            timestamp=datetime.now(),
            data={"initial_state": initial_state}
        ))
 
        state = initial_state
 
        for agent in self.agents:
            # 에이전트 호출 이벤트
            await self.events.append(AgentEvent(
                event_id=generate_id(),
                workflow_id=self.workflow_id,
                event_type=EventType.AGENT_INVOKED,
                agent_id=agent.name,
                timestamp=datetime.now(),
                data={"input_state_keys": list(state.keys())}
            ))
 
            try:
                result = await agent.run(state)
 
                # 완료 이벤트
                await self.events.append(AgentEvent(
                    event_id=generate_id(),
                    workflow_id=self.workflow_id,
                    event_type=EventType.AGENT_COMPLETED,
                    agent_id=agent.name,
                    timestamp=datetime.now(),
                    data={"result": result}
                ))
 
                state = {**state, **result}
 
            except Exception as e:
                # 실패 이벤트
                await self.events.append(AgentEvent(
                    event_id=generate_id(),
                    workflow_id=self.workflow_id,
                    event_type=EventType.AGENT_FAILED,
                    agent_id=agent.name,
                    timestamp=datetime.now(),
                    data={"error": str(e)}
                ))
                raise
 
        return state

재시도 전략

에이전트 실행은 LLM API 장애, 도구 오류, 타임아웃 등 다양한 이유로 실패할 수 있습니다. 적절한 재시도 전략이 필수입니다.

계층적 재시도

python
from enum import Enum
 
class RetryLevel(str, Enum):
    TOOL = "tool"      # 개별 도구 호출 재시도
    AGENT = "agent"    # 에이전트 전체 재실행
    WORKFLOW = "workflow"  # 워크플로우 단계 재시도
 
class RetryPolicy:
    def __init__(
        self,
        max_retries: int = 3,
        initial_delay: float = 1.0,
        max_delay: float = 60.0,
        backoff_multiplier: float = 2.0,
        retryable_errors: list[type] | None = None
    ):
        self.max_retries = max_retries
        self.initial_delay = initial_delay
        self.max_delay = max_delay
        self.backoff_multiplier = backoff_multiplier
        self.retryable_errors = retryable_errors or [
            TimeoutError, ConnectionError, RateLimitError
        ]
 
    def should_retry(self, error: Exception, attempt: int) -> bool:
        if attempt >= self.max_retries:
            return False
        return any(
            isinstance(error, t) for t in self.retryable_errors
        )
 
    def get_delay(self, attempt: int) -> float:
        delay = self.initial_delay * (
            self.backoff_multiplier ** attempt
        )
        return min(delay, self.max_delay)
 
class ResilientAgentExecutor:
    def __init__(self, policies: dict[RetryLevel, RetryPolicy]):
        self.policies = policies
 
    async def execute_with_retry(
        self,
        agent: Agent,
        task: str,
        context: dict
    ) -> dict:
        agent_policy = self.policies.get(
            RetryLevel.AGENT,
            RetryPolicy(max_retries=2)
        )
 
        for attempt in range(agent_policy.max_retries + 1):
            try:
                return await self._execute_agent(
                    agent, task, context
                )
            except Exception as e:
                if not agent_policy.should_retry(e, attempt):
                    raise
                delay = agent_policy.get_delay(attempt)
                await asyncio.sleep(delay)
 
    async def _execute_agent(
        self, agent: Agent, task: str, context: dict
    ) -> dict:
        """에이전트 실행 (도구 호출 레벨 재시도 포함)"""
        tool_policy = self.policies.get(
            RetryLevel.TOOL,
            RetryPolicy(max_retries=3)
        )
 
        # 에이전트의 도구 호출에 재시도 래퍼 적용
        wrapped_tools = [
            self._wrap_tool(tool, tool_policy)
            for tool in agent.tools
        ]
        agent_copy = agent.with_tools(wrapped_tools)
        return await agent_copy.run(task)
 
    def _wrap_tool(self, tool, policy: RetryPolicy):
        original_fn = tool.handler
 
        async def retrying_handler(*args, **kwargs):
            for attempt in range(policy.max_retries + 1):
                try:
                    return await original_fn(*args, **kwargs)
                except Exception as e:
                    if not policy.should_retry(e, attempt):
                        raise
                    await asyncio.sleep(policy.get_delay(attempt))
 
        tool.handler = retrying_handler
        return tool
Warning

LLM의 비결정적 특성 때문에, 에이전트 재시도 시 이전과 완전히 다른 결과가 나올 수 있습니다. 도구 호출 레벨의 재시도는 안전하지만, 에이전트 전체 재실행은 이전 단계의 결과와 불일치를 유발할 수 있습니다. 체크포인트에서 복원 후 재시도하는 것이 더 안전합니다.

보상 트랜잭션(Compensating Transaction)

에이전트가 외부 시스템에 변경을 가한 후 실패하면, 이미 적용된 변경을 되돌려야 합니다. 데이터베이스의 롤백과 유사하지만, 분산 환경에서는 더 복잡합니다.

python
@dataclass
class CompensationAction:
    description: str
    execute: callable
    applied: bool = False
 
class SagaOrchestrator:
    """Saga 패턴으로 보상 트랜잭션 관리"""
 
    def __init__(self):
        self.compensations: list[CompensationAction] = []
 
    async def execute_step(
        self,
        action: callable,
        compensation: callable,
        description: str
    ):
        """단계 실행 및 보상 행동 등록"""
        try:
            result = await action()
            self.compensations.append(CompensationAction(
                description=f"Undo: {description}",
                execute=compensation,
                applied=True
            ))
            return result
        except Exception as e:
            # 실패 시 이전 단계들을 역순으로 보상
            await self.compensate()
            raise SagaFailedError(
                f"Step '{description}' failed: {e}"
            ) from e
 
    async def compensate(self):
        """모든 적용된 단계를 역순으로 보상"""
        for action in reversed(self.compensations):
            if action.applied:
                try:
                    await action.execute()
                    action.applied = False
                except Exception as e:
                    # 보상 실패는 로그만 남기고 계속 진행
                    log.error(
                        f"Compensation failed: {action.description}: {e}"
                    )
 
# 사용 예시: 주문 처리 에이전트 워크플로우
async def process_order(order_id: str):
    saga = SagaOrchestrator()
 
    # 1. 재고 차감
    await saga.execute_step(
        action=lambda: inventory_service.reserve(order_id, items),
        compensation=lambda: inventory_service.release(order_id, items),
        description="재고 예약"
    )
 
    # 2. 결제 처리
    await saga.execute_step(
        action=lambda: payment_service.charge(order_id, amount),
        compensation=lambda: payment_service.refund(order_id, amount),
        description="결제 처리"
    )
 
    # 3. 배송 요청 (여기서 실패하면 1, 2가 보상됨)
    await saga.execute_step(
        action=lambda: shipping_service.create(order_id, address),
        compensation=lambda: shipping_service.cancel(order_id),
        description="배송 요청"
    )

데드 레터 큐(Dead Letter Queue)

재시도 후에도 처리할 수 없는 작업을 별도로 수집하여 분석하고, 수동 개입이나 특별 처리를 할 수 있게 합니다.

python
class DeadLetterQueue:
    def __init__(self, storage):
        self.storage = storage
 
    async def enqueue(
        self,
        workflow_id: str,
        agent_id: str,
        error: Exception,
        state: dict,
        retry_count: int
    ):
        entry = {
            "workflow_id": workflow_id,
            "agent_id": agent_id,
            "error_type": type(error).__name__,
            "error_message": str(error),
            "state_snapshot": state,
            "retry_count": retry_count,
            "enqueued_at": datetime.now().isoformat(),
            "status": "pending"
        }
        await self.storage.save(
            f"dlq/{workflow_id}", entry
        )
        await self._notify_operators(entry)
 
    async def list_pending(self) -> list[dict]:
        return await self.storage.find_all(
            prefix="dlq/", status="pending"
        )
 
    async def retry(self, workflow_id: str):
        """수동 재시도"""
        entry = await self.storage.load(f"dlq/{workflow_id}")
        entry["status"] = "retrying"
        await self.storage.save(f"dlq/{workflow_id}", entry)
 
        # 체크포인트에서 복원하여 재시도
        checkpoint = await checkpoint_mgr.restore(workflow_id)
        if checkpoint:
            await workflow_engine.resume(workflow_id, checkpoint)

다음 장에서는 에이전트 플릿의 관리와 스케일링을 다룹니다. 에이전트 수가 수십에서 수백으로 늘어날 때의 운영 전략, 자동 스케일링, 비용 최적화를 살펴봅니다.

이 글이 도움이 되셨나요?

관련 글

AI / ML

6장: 에이전트 컨트롤 플레인 설계

에이전트 등록과 발견, 설정 관리, 생명주기 관리, 정책 엔진까지 프로덕션 멀티에이전트 시스템의 중추인 컨트롤 플레인 설계를 다룹니다.

2026년 3월 31일·15분
AI / ML

8장: 에이전트 플릿 관리와 스케일링

수십에서 수백 개의 에이전트를 운영하는 플릿 관리 전략, 자동 스케일링, 로드 밸런싱, 비용 최적화를 다룹니다.

2026년 4월 5일·13분
AI / ML

5장: 에이전트 협업과 합의 메커니즘

투표, 토론, 비판적 검토, 다수결 합의 등 여러 에이전트가 하나의 결론에 도달하기 위한 협업 패턴과 합의 프로토콜을 다룹니다.

2026년 3월 30일·16분
이전 글6장: 에이전트 컨트롤 플레인 설계
다음 글8장: 에이전트 플릿 관리와 스케일링

댓글

목차

약 14분 남음
  • 상태 관리가 어려운 이유
  • 체크포인팅(Checkpointing)
    • LangGraph 체크포인팅
    • 커스텀 체크포인팅
  • 이벤트 소싱(Event Sourcing)
    • 이벤트 저장소
  • 재시도 전략
    • 계층적 재시도
  • 보상 트랜잭션(Compensating Transaction)
  • 데드 레터 큐(Dead Letter Queue)