본문으로 건너뛰기
Kreath Archive
TechProjectsBooksAbout
TechProjectsBooksAbout

내비게이션

  • Tech
  • Projects
  • Books
  • About
  • Tags

카테고리

  • AI / ML
  • 웹 개발
  • 프로그래밍
  • 개발 도구

연결

  • GitHub
  • Email
  • RSS
© 2026 Kreath Archive. All rights reserved.Built with Next.js + MDX
홈TechProjectsBooksAbout
//
  1. 홈
  2. 테크
  3. 6장: 상태 관리와 체크포인팅
2026년 3월 11일·AI / ML·

6장: 상태 관리와 체크포인팅

Agentic Workflow의 상태 모델, 이벤트 소싱, 체크포인트 저장소 선택, 멱등성 보장, 상태 복원과 버전 마이그레이션, 분산 상태 일관성 전략을 다룹니다.

16분947자9개 섹션
workflowaiautomation
공유
agentic-workflow6 / 10
12345678910
이전5장: 에러 복구와 재시도 전략다음7장: 감사 로깅과 컴플라이언스

이 장에서 배울 내용

  • 워크플로우 상태 모델의 설계 원칙
  • 이벤트 소싱 기반 상태 관리
  • 체크포인트 저장소별 특성과 선택 기준
  • 멱등성 보장 전략
  • 상태 버전 마이그레이션과 분산 상태 일관성

워크플로우 상태 모델

Agentic Workflow의 상태(State)는 워크플로우의 현재 진행 상황을 표현하는 모든 데이터를 의미합니다. 상태 모델을 올바르게 설계하면 디버깅, 복구, 감사가 용이해집니다.

상태의 구성 요소

state_model.py
python
from pydantic import BaseModel
from datetime import datetime
from enum import Enum
 
class WorkflowPhase(Enum):
    INITIALIZED = "initialized"
    RUNNING = "running"
    PAUSED = "paused"
    WAITING_HUMAN = "waiting_human"
    WAITING_EXTERNAL = "waiting_external"
    COMPLETED = "completed"
    FAILED = "failed"
    COMPENSATING = "compensating"
 
class WorkflowState(BaseModel):
    """워크플로우 상태 모델"""
 
    # 식별 정보
    workflow_id: str
    workflow_type: str
    version: int
 
    # 실행 상태
    phase: WorkflowPhase
    current_node: str
    started_at: datetime
    updated_at: datetime
 
    # 비즈니스 데이터
    input_data: dict
    intermediate_results: dict
    output_data: dict | None = None
 
    # 에이전트 컨텍스트
    agent_memory: dict  # 에이전트의 추론 기록
    tool_call_history: list[dict]  # 도구 호출 이력
 
    # 메타데이터
    retry_count: int = 0
    error_log: list[dict] = []
    checkpoints: list[str] = []  # 체크포인트 ID 목록

상태 설계 원칙

원칙 1 — 최소 상태 원칙: 상태에는 워크플로우를 복원하는 데 필요한 최소한의 정보만 포함합니다. 파생 가능한 데이터는 상태에 저장하지 않고 필요할 때 재계산합니다.

원칙 2 — 불변성(Immutability): 상태 전이 시 기존 상태를 수정하지 않고 새로운 상태 객체를 생성합니다. 이를 통해 상태 이력을 완전히 추적할 수 있습니다.

원칙 3 — 직렬화 가능성: 상태의 모든 필드는 JSON이나 바이너리로 직렬화할 수 있어야 합니다. 파일 핸들, 데이터베이스 커넥션 등 직렬화 불가능한 리소스는 상태에 포함하지 않습니다.

immutable_state.py
python
from dataclasses import dataclass
from typing import Any
 
@dataclass(frozen=True)
class ImmutableState:
    """불변 상태 객체"""
    data: dict
    version: int
    timestamp: float
 
    def transition(self, updates: dict) -> "ImmutableState":
        """새로운 상태 객체를 생성하여 반환"""
        new_data = {**self.data, **updates}
        return ImmutableState(
            data=new_data,
            version=self.version + 1,
            timestamp=time.time(),
        )

이벤트 소싱

**이벤트 소싱(Event Sourcing)**은 상태를 직접 저장하는 대신, 상태를 변경한 모든 이벤트를 순서대로 저장하는 패턴입니다. 현재 상태는 이벤트를 처음부터 재생(replay)하여 도출합니다.

이벤트 정의

events.py
python
from pydantic import BaseModel
from datetime import datetime
 
class WorkflowEvent(BaseModel):
    event_id: str
    workflow_id: str
    event_type: str
    timestamp: datetime
    data: dict
    metadata: dict = {}
 
# 이벤트 유형별 정의
class WorkflowStarted(WorkflowEvent):
    event_type: str = "workflow_started"
    # data: input_data, workflow_type
 
class NodeEntered(WorkflowEvent):
    event_type: str = "node_entered"
    # data: node_name
 
class NodeCompleted(WorkflowEvent):
    event_type: str = "node_completed"
    # data: node_name, output
 
class ToolCalled(WorkflowEvent):
    event_type: str = "tool_called"
    # data: tool_name, input, output, duration_ms
 
class HumanApprovalRequested(WorkflowEvent):
    event_type: str = "human_approval_requested"
    # data: gate_name, context
 
class StateCheckpointed(WorkflowEvent):
    event_type: str = "state_checkpointed"
    # data: checkpoint_id, state_hash

이벤트 저장소와 상태 복원

event_store.py
python
class EventStore:
    """이벤트 저장소"""
 
    async def append(self, event: WorkflowEvent) -> None:
        """이벤트를 추가 전용으로 저장"""
        await self.db.execute(
            """
            INSERT INTO workflow_events
                (event_id, workflow_id, event_type, timestamp, data, metadata)
            VALUES ($1, $2, $3, $4, $5, $6)
            """,
            event.event_id, event.workflow_id, event.event_type,
            event.timestamp, event.data, event.metadata,
        )
 
    async def get_events(
        self, workflow_id: str, after_version: int = 0
    ) -> list[WorkflowEvent]:
        """워크플로우의 이벤트 이력 조회"""
        rows = await self.db.fetch(
            """
            SELECT * FROM workflow_events
            WHERE workflow_id = $1 AND version > $2
            ORDER BY version ASC
            """,
            workflow_id, after_version,
        )
        return [WorkflowEvent(**row) for row in rows]
 
 
class StateProjector:
    """이벤트를 기반으로 현재 상태를 도출"""
 
    def project(self, events: list[WorkflowEvent]) -> WorkflowState:
        state = WorkflowState.initial()
 
        for event in events:
            state = self._apply_event(state, event)
 
        return state
 
    def _apply_event(self, state: WorkflowState, event: WorkflowEvent) -> WorkflowState:
        match event.event_type:
            case "workflow_started":
                return state.transition({
                    "phase": WorkflowPhase.RUNNING,
                    "input_data": event.data,
                })
            case "node_completed":
                return state.transition({
                    "current_node": event.data["next_node"],
                    "intermediate_results": {
                        **state.intermediate_results,
                        event.data["node_name"]: event.data["output"],
                    },
                })
            case "human_approval_requested":
                return state.transition({
                    "phase": WorkflowPhase.WAITING_HUMAN,
                })
            # ... 기타 이벤트 처리
Info

이벤트 소싱의 가장 큰 장점은 완전한 감사 추적(Audit Trail)입니다. 워크플로우의 모든 변화가 이벤트로 기록되므로, 특정 시점의 상태를 정확히 재현할 수 있습니다. 이는 7장에서 다룰 감사 로깅의 기반이 됩니다.

체크포인트 저장소

이벤트를 처음부터 매번 재생하는 것은 이벤트가 많아지면 비효율적입니다. **체크포인트(Checkpoint)**는 특정 시점의 상태 스냅샷을 저장하여 복원 시간을 단축합니다.

저장소별 특성 비교

저장소장점단점적합한 사용 사례
PostgreSQL트랜잭션 안전성, 쿼리 유연성대용량 상태에 비효율일반 워크플로우
Redis빠른 읽기/쓰기, TTL 지원영속성 보장 약함단기 워크플로우
S3/Blob대용량 상태 저장 유리, 저비용지연 시간 높음대용량 컨텍스트 워크플로우
SQLite설정 간단, 로컬 개발 적합동시성 제한개발/테스트 환경

PostgreSQL 체크포인트 구현

pg_checkpoint.py
python
class PostgresCheckpointStore:
    def __init__(self, pool: asyncpg.Pool):
        self.pool = pool
 
    async def save(self, checkpoint: Checkpoint) -> None:
        """체크포인트 저장 (upsert)"""
        await self.pool.execute(
            """
            INSERT INTO checkpoints
                (workflow_id, version, node, state_data, created_at)
            VALUES ($1, $2, $3, $4, $5)
            ON CONFLICT (workflow_id, version)
            DO UPDATE SET state_data = $4, created_at = $5
            """,
            checkpoint.workflow_id,
            checkpoint.version,
            checkpoint.node,
            json.dumps(checkpoint.state_data),
            checkpoint.created_at,
        )
 
    async def load_latest(self, workflow_id: str) -> Checkpoint | None:
        """최신 체크포인트 로드"""
        row = await self.pool.fetchrow(
            """
            SELECT * FROM checkpoints
            WHERE workflow_id = $1
            ORDER BY version DESC
            LIMIT 1
            """,
            workflow_id,
        )
        return Checkpoint(**row) if row else None
 
    async def restore(self, workflow_id: str) -> WorkflowState:
        """체크포인트 + 이후 이벤트로 상태 복원"""
        checkpoint = await self.load_latest(workflow_id)
 
        if checkpoint:
            # 체크포인트 이후의 이벤트만 재생
            events = await self.event_store.get_events(
                workflow_id, after_version=checkpoint.version
            )
            state = checkpoint.to_state()
        else:
            # 체크포인트 없으면 전체 이벤트 재생
            events = await self.event_store.get_events(workflow_id)
            state = WorkflowState.initial()
 
        for event in events:
            state = self.projector.apply_event(state, event)
 
        return state

체크포인트 빈도 전략

체크포인트를 너무 자주 저장하면 I/O 오버헤드가 증가하고, 너무 드물면 복원 시간이 길어집니다.

  • 노드 완료 시마다: 가장 안전하지만 노드가 많으면 비효율적
  • N번째 이벤트마다: 이벤트 100개마다 체크포인트 생성
  • 시간 기반: 5분마다 체크포인트 생성
  • 중요 지점만: HITL 게이트, 외부 API 호출 전후, 분기 지점
Tip

LangGraph를 사용하는 경우, 기본적으로 모든 노드 실행 후 자동으로 체크포인트가 생성됩니다. 커스텀 체크포인트 전략이 필요한 경우에만 별도로 구현하면 됩니다.

멱등성 보장

워크플로우가 체크포인트에서 복원되어 재실행될 때, 동일한 작업이 중복 수행되지 않도록 **멱등성(Idempotency)**을 보장해야 합니다.

멱등성 키 전략

idempotency.py
python
import hashlib
 
class IdempotencyManager:
    def __init__(self, store: IdempotencyStore):
        self.store = store
 
    def generate_key(self, workflow_id: str, node: str, input_hash: str) -> str:
        """결정론적 멱등성 키 생성"""
        raw = f"{workflow_id}:{node}:{input_hash}"
        return hashlib.sha256(raw.encode()).hexdigest()
 
    async def execute_once(
        self,
        key: str,
        func,
        *args,
        **kwargs,
    ):
        """동일 키로 한 번만 실행"""
        # 이미 실행된 결과가 있는지 확인
        cached = await self.store.get(key)
        if cached is not None:
            logger.info(f"멱등성 키 {key[:8]}... 캐시 히트, 이전 결과 반환")
            return cached.result
 
        # 실행 중 락 획득 (중복 실행 방지)
        async with self.store.lock(key, timeout=60):
            # 더블 체크 (락 대기 중 다른 워커가 완료했을 수 있음)
            cached = await self.store.get(key)
            if cached is not None:
                return cached.result
 
            result = await func(*args, **kwargs)
            await self.store.set(key, IdempotencyRecord(
                result=result,
                executed_at=datetime.utcnow(),
                ttl=timedelta(days=7),
            ))
            return result

상태 버전 마이그레이션

워크플로우 코드가 업데이트되면 상태 스키마도 변경될 수 있습니다. 이미 실행 중인 워크플로우의 상태를 새 스키마에 맞게 **마이그레이션(Migration)**해야 합니다.

state_migration.py
python
class StateMigrator:
    """상태 스키마 버전 마이그레이션"""
 
    def __init__(self):
        self.migrations: dict[tuple[int, int], Callable] = {}
 
    def register(self, from_version: int, to_version: int):
        """마이그레이션 함수 등록"""
        def decorator(func):
            self.migrations[(from_version, to_version)] = func
            return func
        return decorator
 
    def migrate(self, state: dict, from_version: int, to_version: int) -> dict:
        """상태를 목표 버전으로 순차 마이그레이션"""
        current = from_version
        current_state = state
 
        while current < to_version:
            next_version = current + 1
            migration = self.migrations.get((current, next_version))
 
            if migration is None:
                raise MigrationNotFound(
                    f"v{current} -> v{next_version} 마이그레이션이 정의되지 않음"
                )
 
            current_state = migration(current_state)
            current = next_version
 
        current_state["__version__"] = to_version
        return current_state
 
migrator = StateMigrator()
 
@migrator.register(1, 2)
def v1_to_v2(state: dict) -> dict:
    """v1 -> v2: priority 필드를 숫자에서 문자열로 변경"""
    priority_map = {1: "low", 2: "medium", 3: "high", 4: "urgent"}
    state["priority"] = priority_map.get(state.get("priority", 2), "medium")
    return state
 
@migrator.register(2, 3)
def v2_to_v3(state: dict) -> dict:
    """v2 -> v3: customer_info 구조 변경"""
    if "customer_name" in state:
        state["customer"] = {
            "name": state.pop("customer_name"),
            "email": state.pop("customer_email", None),
        }
    return state
Warning

마이그레이션은 항상 순방향으로만 적용합니다. 롤백이 필요한 경우를 대비하여 마이그레이션 전 원본 상태를 백업해 두는 것이 안전합니다. Temporal에서는 워크플로우 버전 관리(Workflow Versioning) 기능을 사용하여 이 문제를 체계적으로 처리할 수 있습니다.

분산 상태 일관성

여러 워커가 동일한 워크플로우 상태에 접근하는 분산 환경에서는 일관성을 보장하는 전략이 필요합니다.

낙관적 동시성 제어

optimistic_concurrency.py
python
class OptimisticStateStore:
    async def update(self, workflow_id: str, state: dict, expected_version: int) -> bool:
        """낙관적 동시성 제어로 상태 업데이트"""
        result = await self.db.execute(
            """
            UPDATE workflow_states
            SET state_data = $1, version = version + 1, updated_at = NOW()
            WHERE workflow_id = $2 AND version = $3
            """,
            json.dumps(state), workflow_id, expected_version,
        )
 
        if result == "UPDATE 0":
            # 다른 워커가 먼저 업데이트함 -> 충돌
            raise OptimisticLockError(
                f"워크플로우 {workflow_id}의 버전 충돌: "
                f"기대 버전 {expected_version}, 현재 버전 불일치"
            )
        return True

Temporal이나 LangGraph 서버를 사용하면 분산 상태 일관성이 프레임워크 수준에서 관리됩니다. 직접 상태 저장소를 구현하는 경우에만 위와 같은 동시성 제어가 필요합니다.


정리

이 장에서는 Agentic Workflow의 상태 관리와 체크포인팅을 심층적으로 다루었습니다.

  • 상태 모델은 최소 상태, 불변성, 직렬화 가능성 원칙으로 설계합니다
  • 이벤트 소싱으로 모든 상태 변화를 추적하고, 체크포인트로 복원 시간을 단축합니다
  • 저장소는 워크플로우 특성에 맞게 선택합니다 (PostgreSQL, Redis, S3 등)
  • 멱등성 키로 중복 실행을 방지하고, 버전 마이그레이션으로 스키마 변경을 관리합니다
  • 분산 환경에서는 낙관적 동시성 제어로 일관성을 보장합니다

다음 장 예고

7장에서는 에이전트의 모든 결정과 행동을 기록하는 감사 로깅과 컴플라이언스를 다룹니다. 불변 감사 로그, 규제 요구사항 대응, 설명 가능성, 재현 가능성, OpenTelemetry 통합 등을 살펴보겠습니다.

이 글이 도움이 되셨나요?

관련 주제 더 보기

#workflow#ai#automation

관련 글

AI / ML

7장: 감사 로깅과 컴플라이언스

에이전트 행동 추적, 불변 감사 로그 설계, 규제 요구사항 대응, 설명 가능성, 재현 가능성, OpenTelemetry 통합, 보존 정책을 다룹니다.

2026년 3월 13일·16분
AI / ML

5장: 에러 복구와 재시도 전략

Agentic Workflow의 에러 분류 체계, 지수 백오프, 서킷 브레이커, 모델 폴백, Saga 패턴 기반 보상 트랜잭션, 데드레터 큐 등 복원력 패턴을 다룹니다.

2026년 3월 9일·17분
AI / ML

8장: 엔터프라이즈 시스템 통합

ERP/CRM/ITSM 연동, MCP 기반 도구 통합, API 게이트웨이, 이벤트 드리븐 통합, 레거시 시스템 어댑터, 트랜잭션 경계 설계를 다룹니다.

2026년 3월 15일·16분
이전 글5장: 에러 복구와 재시도 전략
다음 글7장: 감사 로깅과 컴플라이언스

댓글

목차

약 16분 남음
  • 이 장에서 배울 내용
  • 워크플로우 상태 모델
    • 상태의 구성 요소
    • 상태 설계 원칙
  • 이벤트 소싱
    • 이벤트 정의
    • 이벤트 저장소와 상태 복원
  • 체크포인트 저장소
    • 저장소별 특성 비교
    • PostgreSQL 체크포인트 구현
    • 체크포인트 빈도 전략
  • 멱등성 보장
    • 멱등성 키 전략
  • 상태 버전 마이그레이션
  • 분산 상태 일관성
    • 낙관적 동시성 제어
  • 정리
  • 다음 장 예고