Agentic Workflow의 상태 모델, 이벤트 소싱, 체크포인트 저장소 선택, 멱등성 보장, 상태 복원과 버전 마이그레이션, 분산 상태 일관성 전략을 다룹니다.
Agentic Workflow의 상태(State)는 워크플로우의 현재 진행 상황을 표현하는 모든 데이터를 의미합니다. 상태 모델을 올바르게 설계하면 디버깅, 복구, 감사가 용이해집니다.
from pydantic import BaseModel
from datetime import datetime
from enum import Enum
class WorkflowPhase(Enum):
INITIALIZED = "initialized"
RUNNING = "running"
PAUSED = "paused"
WAITING_HUMAN = "waiting_human"
WAITING_EXTERNAL = "waiting_external"
COMPLETED = "completed"
FAILED = "failed"
COMPENSATING = "compensating"
class WorkflowState(BaseModel):
"""워크플로우 상태 모델"""
# 식별 정보
workflow_id: str
workflow_type: str
version: int
# 실행 상태
phase: WorkflowPhase
current_node: str
started_at: datetime
updated_at: datetime
# 비즈니스 데이터
input_data: dict
intermediate_results: dict
output_data: dict | None = None
# 에이전트 컨텍스트
agent_memory: dict # 에이전트의 추론 기록
tool_call_history: list[dict] # 도구 호출 이력
# 메타데이터
retry_count: int = 0
error_log: list[dict] = []
checkpoints: list[str] = [] # 체크포인트 ID 목록원칙 1 — 최소 상태 원칙: 상태에는 워크플로우를 복원하는 데 필요한 최소한의 정보만 포함합니다. 파생 가능한 데이터는 상태에 저장하지 않고 필요할 때 재계산합니다.
원칙 2 — 불변성(Immutability): 상태 전이 시 기존 상태를 수정하지 않고 새로운 상태 객체를 생성합니다. 이를 통해 상태 이력을 완전히 추적할 수 있습니다.
원칙 3 — 직렬화 가능성: 상태의 모든 필드는 JSON이나 바이너리로 직렬화할 수 있어야 합니다. 파일 핸들, 데이터베이스 커넥션 등 직렬화 불가능한 리소스는 상태에 포함하지 않습니다.
from dataclasses import dataclass
from typing import Any
@dataclass(frozen=True)
class ImmutableState:
"""불변 상태 객체"""
data: dict
version: int
timestamp: float
def transition(self, updates: dict) -> "ImmutableState":
"""새로운 상태 객체를 생성하여 반환"""
new_data = {**self.data, **updates}
return ImmutableState(
data=new_data,
version=self.version + 1,
timestamp=time.time(),
)**이벤트 소싱(Event Sourcing)**은 상태를 직접 저장하는 대신, 상태를 변경한 모든 이벤트를 순서대로 저장하는 패턴입니다. 현재 상태는 이벤트를 처음부터 재생(replay)하여 도출합니다.
from pydantic import BaseModel
from datetime import datetime
class WorkflowEvent(BaseModel):
event_id: str
workflow_id: str
event_type: str
timestamp: datetime
data: dict
metadata: dict = {}
# 이벤트 유형별 정의
class WorkflowStarted(WorkflowEvent):
event_type: str = "workflow_started"
# data: input_data, workflow_type
class NodeEntered(WorkflowEvent):
event_type: str = "node_entered"
# data: node_name
class NodeCompleted(WorkflowEvent):
event_type: str = "node_completed"
# data: node_name, output
class ToolCalled(WorkflowEvent):
event_type: str = "tool_called"
# data: tool_name, input, output, duration_ms
class HumanApprovalRequested(WorkflowEvent):
event_type: str = "human_approval_requested"
# data: gate_name, context
class StateCheckpointed(WorkflowEvent):
event_type: str = "state_checkpointed"
# data: checkpoint_id, state_hashclass EventStore:
"""이벤트 저장소"""
async def append(self, event: WorkflowEvent) -> None:
"""이벤트를 추가 전용으로 저장"""
await self.db.execute(
"""
INSERT INTO workflow_events
(event_id, workflow_id, event_type, timestamp, data, metadata)
VALUES ($1, $2, $3, $4, $5, $6)
""",
event.event_id, event.workflow_id, event.event_type,
event.timestamp, event.data, event.metadata,
)
async def get_events(
self, workflow_id: str, after_version: int = 0
) -> list[WorkflowEvent]:
"""워크플로우의 이벤트 이력 조회"""
rows = await self.db.fetch(
"""
SELECT * FROM workflow_events
WHERE workflow_id = $1 AND version > $2
ORDER BY version ASC
""",
workflow_id, after_version,
)
return [WorkflowEvent(**row) for row in rows]
class StateProjector:
"""이벤트를 기반으로 현재 상태를 도출"""
def project(self, events: list[WorkflowEvent]) -> WorkflowState:
state = WorkflowState.initial()
for event in events:
state = self._apply_event(state, event)
return state
def _apply_event(self, state: WorkflowState, event: WorkflowEvent) -> WorkflowState:
match event.event_type:
case "workflow_started":
return state.transition({
"phase": WorkflowPhase.RUNNING,
"input_data": event.data,
})
case "node_completed":
return state.transition({
"current_node": event.data["next_node"],
"intermediate_results": {
**state.intermediate_results,
event.data["node_name"]: event.data["output"],
},
})
case "human_approval_requested":
return state.transition({
"phase": WorkflowPhase.WAITING_HUMAN,
})
# ... 기타 이벤트 처리이벤트 소싱의 가장 큰 장점은 완전한 감사 추적(Audit Trail)입니다. 워크플로우의 모든 변화가 이벤트로 기록되므로, 특정 시점의 상태를 정확히 재현할 수 있습니다. 이는 7장에서 다룰 감사 로깅의 기반이 됩니다.
이벤트를 처음부터 매번 재생하는 것은 이벤트가 많아지면 비효율적입니다. **체크포인트(Checkpoint)**는 특정 시점의 상태 스냅샷을 저장하여 복원 시간을 단축합니다.
| 저장소 | 장점 | 단점 | 적합한 사용 사례 |
|---|---|---|---|
| PostgreSQL | 트랜잭션 안전성, 쿼리 유연성 | 대용량 상태에 비효율 | 일반 워크플로우 |
| Redis | 빠른 읽기/쓰기, TTL 지원 | 영속성 보장 약함 | 단기 워크플로우 |
| S3/Blob | 대용량 상태 저장 유리, 저비용 | 지연 시간 높음 | 대용량 컨텍스트 워크플로우 |
| SQLite | 설정 간단, 로컬 개발 적합 | 동시성 제한 | 개발/테스트 환경 |
class PostgresCheckpointStore:
def __init__(self, pool: asyncpg.Pool):
self.pool = pool
async def save(self, checkpoint: Checkpoint) -> None:
"""체크포인트 저장 (upsert)"""
await self.pool.execute(
"""
INSERT INTO checkpoints
(workflow_id, version, node, state_data, created_at)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (workflow_id, version)
DO UPDATE SET state_data = $4, created_at = $5
""",
checkpoint.workflow_id,
checkpoint.version,
checkpoint.node,
json.dumps(checkpoint.state_data),
checkpoint.created_at,
)
async def load_latest(self, workflow_id: str) -> Checkpoint | None:
"""최신 체크포인트 로드"""
row = await self.pool.fetchrow(
"""
SELECT * FROM checkpoints
WHERE workflow_id = $1
ORDER BY version DESC
LIMIT 1
""",
workflow_id,
)
return Checkpoint(**row) if row else None
async def restore(self, workflow_id: str) -> WorkflowState:
"""체크포인트 + 이후 이벤트로 상태 복원"""
checkpoint = await self.load_latest(workflow_id)
if checkpoint:
# 체크포인트 이후의 이벤트만 재생
events = await self.event_store.get_events(
workflow_id, after_version=checkpoint.version
)
state = checkpoint.to_state()
else:
# 체크포인트 없으면 전체 이벤트 재생
events = await self.event_store.get_events(workflow_id)
state = WorkflowState.initial()
for event in events:
state = self.projector.apply_event(state, event)
return state체크포인트를 너무 자주 저장하면 I/O 오버헤드가 증가하고, 너무 드물면 복원 시간이 길어집니다.
LangGraph를 사용하는 경우, 기본적으로 모든 노드 실행 후 자동으로 체크포인트가 생성됩니다. 커스텀 체크포인트 전략이 필요한 경우에만 별도로 구현하면 됩니다.
워크플로우가 체크포인트에서 복원되어 재실행될 때, 동일한 작업이 중복 수행되지 않도록 **멱등성(Idempotency)**을 보장해야 합니다.
import hashlib
class IdempotencyManager:
def __init__(self, store: IdempotencyStore):
self.store = store
def generate_key(self, workflow_id: str, node: str, input_hash: str) -> str:
"""결정론적 멱등성 키 생성"""
raw = f"{workflow_id}:{node}:{input_hash}"
return hashlib.sha256(raw.encode()).hexdigest()
async def execute_once(
self,
key: str,
func,
*args,
**kwargs,
):
"""동일 키로 한 번만 실행"""
# 이미 실행된 결과가 있는지 확인
cached = await self.store.get(key)
if cached is not None:
logger.info(f"멱등성 키 {key[:8]}... 캐시 히트, 이전 결과 반환")
return cached.result
# 실행 중 락 획득 (중복 실행 방지)
async with self.store.lock(key, timeout=60):
# 더블 체크 (락 대기 중 다른 워커가 완료했을 수 있음)
cached = await self.store.get(key)
if cached is not None:
return cached.result
result = await func(*args, **kwargs)
await self.store.set(key, IdempotencyRecord(
result=result,
executed_at=datetime.utcnow(),
ttl=timedelta(days=7),
))
return result워크플로우 코드가 업데이트되면 상태 스키마도 변경될 수 있습니다. 이미 실행 중인 워크플로우의 상태를 새 스키마에 맞게 **마이그레이션(Migration)**해야 합니다.
class StateMigrator:
"""상태 스키마 버전 마이그레이션"""
def __init__(self):
self.migrations: dict[tuple[int, int], Callable] = {}
def register(self, from_version: int, to_version: int):
"""마이그레이션 함수 등록"""
def decorator(func):
self.migrations[(from_version, to_version)] = func
return func
return decorator
def migrate(self, state: dict, from_version: int, to_version: int) -> dict:
"""상태를 목표 버전으로 순차 마이그레이션"""
current = from_version
current_state = state
while current < to_version:
next_version = current + 1
migration = self.migrations.get((current, next_version))
if migration is None:
raise MigrationNotFound(
f"v{current} -> v{next_version} 마이그레이션이 정의되지 않음"
)
current_state = migration(current_state)
current = next_version
current_state["__version__"] = to_version
return current_state
migrator = StateMigrator()
@migrator.register(1, 2)
def v1_to_v2(state: dict) -> dict:
"""v1 -> v2: priority 필드를 숫자에서 문자열로 변경"""
priority_map = {1: "low", 2: "medium", 3: "high", 4: "urgent"}
state["priority"] = priority_map.get(state.get("priority", 2), "medium")
return state
@migrator.register(2, 3)
def v2_to_v3(state: dict) -> dict:
"""v2 -> v3: customer_info 구조 변경"""
if "customer_name" in state:
state["customer"] = {
"name": state.pop("customer_name"),
"email": state.pop("customer_email", None),
}
return state마이그레이션은 항상 순방향으로만 적용합니다. 롤백이 필요한 경우를 대비하여 마이그레이션 전 원본 상태를 백업해 두는 것이 안전합니다. Temporal에서는 워크플로우 버전 관리(Workflow Versioning) 기능을 사용하여 이 문제를 체계적으로 처리할 수 있습니다.
여러 워커가 동일한 워크플로우 상태에 접근하는 분산 환경에서는 일관성을 보장하는 전략이 필요합니다.
class OptimisticStateStore:
async def update(self, workflow_id: str, state: dict, expected_version: int) -> bool:
"""낙관적 동시성 제어로 상태 업데이트"""
result = await self.db.execute(
"""
UPDATE workflow_states
SET state_data = $1, version = version + 1, updated_at = NOW()
WHERE workflow_id = $2 AND version = $3
""",
json.dumps(state), workflow_id, expected_version,
)
if result == "UPDATE 0":
# 다른 워커가 먼저 업데이트함 -> 충돌
raise OptimisticLockError(
f"워크플로우 {workflow_id}의 버전 충돌: "
f"기대 버전 {expected_version}, 현재 버전 불일치"
)
return TrueTemporal이나 LangGraph 서버를 사용하면 분산 상태 일관성이 프레임워크 수준에서 관리됩니다. 직접 상태 저장소를 구현하는 경우에만 위와 같은 동시성 제어가 필요합니다.
이 장에서는 Agentic Workflow의 상태 관리와 체크포인팅을 심층적으로 다루었습니다.
7장에서는 에이전트의 모든 결정과 행동을 기록하는 감사 로깅과 컴플라이언스를 다룹니다. 불변 감사 로그, 규제 요구사항 대응, 설명 가능성, 재현 가능성, OpenTelemetry 통합 등을 살펴보겠습니다.
이 글이 도움이 되셨나요?
관련 주제 더 보기
에이전트 행동 추적, 불변 감사 로그 설계, 규제 요구사항 대응, 설명 가능성, 재현 가능성, OpenTelemetry 통합, 보존 정책을 다룹니다.
Agentic Workflow의 에러 분류 체계, 지수 백오프, 서킷 브레이커, 모델 폴백, Saga 패턴 기반 보상 트랜잭션, 데드레터 큐 등 복원력 패턴을 다룹니다.
ERP/CRM/ITSM 연동, MCP 기반 도구 통합, API 게이트웨이, 이벤트 드리븐 통합, 레거시 시스템 어댑터, 트랜잭션 경계 설계를 다룹니다.