멀티에이전트 시스템의 체크포인팅, 이벤트 소싱, 재시도 전략, 그리고 분산 환경에서의 일관성 보장과 장애 복구 패턴을 다룹니다.
멀티에이전트 시스템의 상태 관리는 일반적인 분산 시스템보다 더 복잡합니다. 에이전트의 "상태"에는 대화 이력, 도구 호출 결과, 중간 추론 과정, 다른 에이전트와의 통신 기록이 모두 포함되기 때문입니다. 이 상태가 유실되면 에이전트는 맥락을 잃고, 전체 작업을 처음부터 다시 시작해야 합니다.
상태 관리의 핵심 과제는 세 가지입니다.
일관성(Consistency): 여러 에이전트가 공유하는 상태가 항상 동기화되어야 합니다. 에이전트 A가 데이터를 수정했는데 에이전트 B가 이전 버전을 읽으면 결과가 틀어집니다.
내구성(Durability): 에이전트나 시스템이 크래시되어도 진행 상태가 보존되어야 합니다. 10단계 중 8단계까지 완료된 작업을 1단계부터 다시 하는 것은 낭비입니다.
가시성(Visibility): 현재 전체 작업의 진행 상태를 실시간으로 파악할 수 있어야 합니다. "지금 어떤 에이전트가 무슨 단계에 있는가"를 알 수 없으면 디버깅도 모니터링도 불가능합니다.
LangGraph는 체크포인팅을 핵심 기능으로 제공합니다. 그래프의 각 노드(에이전트) 실행 후 전체 상태를 저장하여, 중단 시점부터 재개할 수 있습니다.
from langgraph.graph import StateGraph
from langgraph.checkpoint.postgres import PostgresSaver
# PostgreSQL 기반 체크포인터
checkpointer = PostgresSaver.from_conn_string(
"postgresql://user:pass@localhost:5432/agents"
)
# 상태 정의
class WorkflowState(TypedDict):
messages: list[dict]
current_step: str
completed_steps: list[str]
intermediate_results: dict
retry_count: int
# 그래프 정의
workflow = StateGraph(WorkflowState)
workflow.add_node("research", research_agent)
workflow.add_node("analyze", analysis_agent)
workflow.add_node("write", writing_agent)
workflow.add_node("review", review_agent)
workflow.add_edge("research", "analyze")
workflow.add_edge("analyze", "write")
workflow.add_conditional_edges(
"review",
lambda state: "write" if state["needs_revision"] else "__end__"
)
# 체크포인터 연결
app = workflow.compile(checkpointer=checkpointer)
# 실행 - 각 노드 후 자동 체크포인트
config = {"configurable": {"thread_id": "task-001"}}
result = await app.ainvoke(initial_state, config)
# 크래시 후 재개 - 마지막 체크포인트에서 이어서 실행
result = await app.ainvoke(None, config)프레임워크에 의존하지 않는 범용 체크포인팅 시스템입니다.
import json
from datetime import datetime
class CheckpointManager:
def __init__(self, storage):
self.storage = storage
async def save(
self,
workflow_id: str,
step_id: str,
state: dict,
metadata: dict | None = None
):
"""체크포인트 저장"""
checkpoint = {
"workflow_id": workflow_id,
"step_id": step_id,
"state": state,
"metadata": metadata or {},
"timestamp": datetime.now().isoformat(),
"version": await self._next_version(workflow_id)
}
await self.storage.save(
f"checkpoints/{workflow_id}/{checkpoint['version']}",
checkpoint
)
async def restore(
self, workflow_id: str,
version: int | None = None
) -> dict | None:
"""체크포인트 복원 (버전 미지정 시 최신)"""
if version is None:
version = await self._latest_version(workflow_id)
if version is None:
return None
return await self.storage.load(
f"checkpoints/{workflow_id}/{version}"
)
async def list_checkpoints(
self, workflow_id: str
) -> list[dict]:
"""워크플로우의 체크포인트 이력 조회"""
return await self.storage.list(
f"checkpoints/{workflow_id}/"
)
async def cleanup(
self, workflow_id: str, keep_last: int = 5
):
"""오래된 체크포인트 정리"""
all_checkpoints = await self.list_checkpoints(workflow_id)
if len(all_checkpoints) > keep_last:
to_delete = all_checkpoints[:-keep_last]
for cp in to_delete:
await self.storage.delete(cp["key"])
# 사용 예시
checkpoint_mgr = CheckpointManager(storage)
class CheckpointedWorkflow:
def __init__(self, workflow_id: str, agents: list):
self.workflow_id = workflow_id
self.agents = agents
async def execute(self, initial_state: dict) -> dict:
# 기존 체크포인트 확인
checkpoint = await checkpoint_mgr.restore(
self.workflow_id
)
if checkpoint:
state = checkpoint["state"]
start_step = checkpoint["step_id"]
print(f"Resuming from step: {start_step}")
else:
state = initial_state
start_step = None
# 에이전트 순차 실행 (체크포인트 이후부터)
started = start_step is None
for agent in self.agents:
if not started:
if agent.name == start_step:
started = True
continue
result = await agent.run(state)
state = {**state, **result}
# 체크포인트 저장
await checkpoint_mgr.save(
self.workflow_id,
agent.name,
state,
{"agent": agent.name, "status": "completed"}
)
return state체크포인팅의 빈도는 트레이드오프입니다. 모든 LLM 호출마다 저장하면 안전하지만 스토리지 비용과 지연시간이 증가합니다. 일반적으로 에이전트 단위(노드 단위)로 체크포인팅하는 것이 적절합니다.
체크포인팅이 "현재 상태의 스냅샷"을 저장한다면, 이벤트 소싱은 "상태를 만들어낸 모든 이벤트"를 저장합니다. 디버깅과 감사에 강력하지만 구현 복잡도가 높습니다.
from dataclasses import dataclass, field
from enum import Enum
class EventType(str, Enum):
WORKFLOW_STARTED = "workflow.started"
AGENT_INVOKED = "agent.invoked"
AGENT_COMPLETED = "agent.completed"
AGENT_FAILED = "agent.failed"
TOOL_CALLED = "tool.called"
TOOL_RETURNED = "tool.returned"
HANDOFF_INITIATED = "handoff.initiated"
HANDOFF_COMPLETED = "handoff.completed"
STATE_UPDATED = "state.updated"
@dataclass
class AgentEvent:
event_id: str
workflow_id: str
event_type: EventType
agent_id: str | None
timestamp: datetime
data: dict
metadata: dict = field(default_factory=dict)
class EventStore:
def __init__(self, storage):
self.storage = storage
async def append(self, event: AgentEvent):
"""이벤트 추가 (불변, append-only)"""
await self.storage.append(
f"events/{event.workflow_id}",
event
)
async def get_events(
self,
workflow_id: str,
after: datetime | None = None,
event_types: list[EventType] | None = None
) -> list[AgentEvent]:
"""이벤트 조회"""
events = await self.storage.load_all(
f"events/{workflow_id}"
)
if after:
events = [e for e in events if e.timestamp > after]
if event_types:
events = [e for e in events if e.event_type in event_types]
return events
async def rebuild_state(self, workflow_id: str) -> dict:
"""이벤트 리플레이로 현재 상태 재구성"""
events = await self.get_events(workflow_id)
state = {}
for event in events:
if event.event_type == EventType.WORKFLOW_STARTED:
state = event.data.get("initial_state", {})
elif event.event_type == EventType.STATE_UPDATED:
state.update(event.data.get("updates", {}))
elif event.event_type == EventType.AGENT_COMPLETED:
state["last_completed_agent"] = event.agent_id
state.update(event.data.get("result", {}))
return state
# 이벤트 소싱 기반 워크플로우
class EventSourcedWorkflow:
def __init__(
self,
workflow_id: str,
event_store: EventStore,
agents: list
):
self.workflow_id = workflow_id
self.events = event_store
self.agents = agents
async def execute(self, initial_state: dict) -> dict:
await self.events.append(AgentEvent(
event_id=generate_id(),
workflow_id=self.workflow_id,
event_type=EventType.WORKFLOW_STARTED,
agent_id=None,
timestamp=datetime.now(),
data={"initial_state": initial_state}
))
state = initial_state
for agent in self.agents:
# 에이전트 호출 이벤트
await self.events.append(AgentEvent(
event_id=generate_id(),
workflow_id=self.workflow_id,
event_type=EventType.AGENT_INVOKED,
agent_id=agent.name,
timestamp=datetime.now(),
data={"input_state_keys": list(state.keys())}
))
try:
result = await agent.run(state)
# 완료 이벤트
await self.events.append(AgentEvent(
event_id=generate_id(),
workflow_id=self.workflow_id,
event_type=EventType.AGENT_COMPLETED,
agent_id=agent.name,
timestamp=datetime.now(),
data={"result": result}
))
state = {**state, **result}
except Exception as e:
# 실패 이벤트
await self.events.append(AgentEvent(
event_id=generate_id(),
workflow_id=self.workflow_id,
event_type=EventType.AGENT_FAILED,
agent_id=agent.name,
timestamp=datetime.now(),
data={"error": str(e)}
))
raise
return state에이전트 실행은 LLM API 장애, 도구 오류, 타임아웃 등 다양한 이유로 실패할 수 있습니다. 적절한 재시도 전략이 필수입니다.
from enum import Enum
class RetryLevel(str, Enum):
TOOL = "tool" # 개별 도구 호출 재시도
AGENT = "agent" # 에이전트 전체 재실행
WORKFLOW = "workflow" # 워크플로우 단계 재시도
class RetryPolicy:
def __init__(
self,
max_retries: int = 3,
initial_delay: float = 1.0,
max_delay: float = 60.0,
backoff_multiplier: float = 2.0,
retryable_errors: list[type] | None = None
):
self.max_retries = max_retries
self.initial_delay = initial_delay
self.max_delay = max_delay
self.backoff_multiplier = backoff_multiplier
self.retryable_errors = retryable_errors or [
TimeoutError, ConnectionError, RateLimitError
]
def should_retry(self, error: Exception, attempt: int) -> bool:
if attempt >= self.max_retries:
return False
return any(
isinstance(error, t) for t in self.retryable_errors
)
def get_delay(self, attempt: int) -> float:
delay = self.initial_delay * (
self.backoff_multiplier ** attempt
)
return min(delay, self.max_delay)
class ResilientAgentExecutor:
def __init__(self, policies: dict[RetryLevel, RetryPolicy]):
self.policies = policies
async def execute_with_retry(
self,
agent: Agent,
task: str,
context: dict
) -> dict:
agent_policy = self.policies.get(
RetryLevel.AGENT,
RetryPolicy(max_retries=2)
)
for attempt in range(agent_policy.max_retries + 1):
try:
return await self._execute_agent(
agent, task, context
)
except Exception as e:
if not agent_policy.should_retry(e, attempt):
raise
delay = agent_policy.get_delay(attempt)
await asyncio.sleep(delay)
async def _execute_agent(
self, agent: Agent, task: str, context: dict
) -> dict:
"""에이전트 실행 (도구 호출 레벨 재시도 포함)"""
tool_policy = self.policies.get(
RetryLevel.TOOL,
RetryPolicy(max_retries=3)
)
# 에이전트의 도구 호출에 재시도 래퍼 적용
wrapped_tools = [
self._wrap_tool(tool, tool_policy)
for tool in agent.tools
]
agent_copy = agent.with_tools(wrapped_tools)
return await agent_copy.run(task)
def _wrap_tool(self, tool, policy: RetryPolicy):
original_fn = tool.handler
async def retrying_handler(*args, **kwargs):
for attempt in range(policy.max_retries + 1):
try:
return await original_fn(*args, **kwargs)
except Exception as e:
if not policy.should_retry(e, attempt):
raise
await asyncio.sleep(policy.get_delay(attempt))
tool.handler = retrying_handler
return toolLLM의 비결정적 특성 때문에, 에이전트 재시도 시 이전과 완전히 다른 결과가 나올 수 있습니다. 도구 호출 레벨의 재시도는 안전하지만, 에이전트 전체 재실행은 이전 단계의 결과와 불일치를 유발할 수 있습니다. 체크포인트에서 복원 후 재시도하는 것이 더 안전합니다.
에이전트가 외부 시스템에 변경을 가한 후 실패하면, 이미 적용된 변경을 되돌려야 합니다. 데이터베이스의 롤백과 유사하지만, 분산 환경에서는 더 복잡합니다.
@dataclass
class CompensationAction:
description: str
execute: callable
applied: bool = False
class SagaOrchestrator:
"""Saga 패턴으로 보상 트랜잭션 관리"""
def __init__(self):
self.compensations: list[CompensationAction] = []
async def execute_step(
self,
action: callable,
compensation: callable,
description: str
):
"""단계 실행 및 보상 행동 등록"""
try:
result = await action()
self.compensations.append(CompensationAction(
description=f"Undo: {description}",
execute=compensation,
applied=True
))
return result
except Exception as e:
# 실패 시 이전 단계들을 역순으로 보상
await self.compensate()
raise SagaFailedError(
f"Step '{description}' failed: {e}"
) from e
async def compensate(self):
"""모든 적용된 단계를 역순으로 보상"""
for action in reversed(self.compensations):
if action.applied:
try:
await action.execute()
action.applied = False
except Exception as e:
# 보상 실패는 로그만 남기고 계속 진행
log.error(
f"Compensation failed: {action.description}: {e}"
)
# 사용 예시: 주문 처리 에이전트 워크플로우
async def process_order(order_id: str):
saga = SagaOrchestrator()
# 1. 재고 차감
await saga.execute_step(
action=lambda: inventory_service.reserve(order_id, items),
compensation=lambda: inventory_service.release(order_id, items),
description="재고 예약"
)
# 2. 결제 처리
await saga.execute_step(
action=lambda: payment_service.charge(order_id, amount),
compensation=lambda: payment_service.refund(order_id, amount),
description="결제 처리"
)
# 3. 배송 요청 (여기서 실패하면 1, 2가 보상됨)
await saga.execute_step(
action=lambda: shipping_service.create(order_id, address),
compensation=lambda: shipping_service.cancel(order_id),
description="배송 요청"
)재시도 후에도 처리할 수 없는 작업을 별도로 수집하여 분석하고, 수동 개입이나 특별 처리를 할 수 있게 합니다.
class DeadLetterQueue:
def __init__(self, storage):
self.storage = storage
async def enqueue(
self,
workflow_id: str,
agent_id: str,
error: Exception,
state: dict,
retry_count: int
):
entry = {
"workflow_id": workflow_id,
"agent_id": agent_id,
"error_type": type(error).__name__,
"error_message": str(error),
"state_snapshot": state,
"retry_count": retry_count,
"enqueued_at": datetime.now().isoformat(),
"status": "pending"
}
await self.storage.save(
f"dlq/{workflow_id}", entry
)
await self._notify_operators(entry)
async def list_pending(self) -> list[dict]:
return await self.storage.find_all(
prefix="dlq/", status="pending"
)
async def retry(self, workflow_id: str):
"""수동 재시도"""
entry = await self.storage.load(f"dlq/{workflow_id}")
entry["status"] = "retrying"
await self.storage.save(f"dlq/{workflow_id}", entry)
# 체크포인트에서 복원하여 재시도
checkpoint = await checkpoint_mgr.restore(workflow_id)
if checkpoint:
await workflow_engine.resume(workflow_id, checkpoint)다음 장에서는 에이전트 플릿의 관리와 스케일링을 다룹니다. 에이전트 수가 수십에서 수백으로 늘어날 때의 운영 전략, 자동 스케일링, 비용 최적화를 살펴봅니다.
이 글이 도움이 되셨나요?
에이전트 등록과 발견, 설정 관리, 생명주기 관리, 정책 엔진까지 프로덕션 멀티에이전트 시스템의 중추인 컨트롤 플레인 설계를 다룹니다.
수십에서 수백 개의 에이전트를 운영하는 플릿 관리 전략, 자동 스케일링, 로드 밸런싱, 비용 최적화를 다룹니다.
투표, 토론, 비판적 검토, 다수결 합의 등 여러 에이전트가 하나의 결론에 도달하기 위한 협업 패턴과 합의 프로토콜을 다룹니다.