본문으로 건너뛰기
Kreath Archive
TechProjectsBooksAbout
TechProjectsBooksAbout

내비게이션

  • Tech
  • Projects
  • Books
  • About
  • Tags

카테고리

  • AI / ML
  • 웹 개발
  • 프로그래밍
  • 개발 도구

연결

  • GitHub
  • Email
  • RSS
© 2026 Kreath Archive. All rights reserved.Built with Next.js + MDX
홈TechProjectsBooksAbout
//
  1. 홈
  2. 테크
  3. 4장: 장기 실행 워크플로우 관리
2026년 3월 7일·AI / ML·

4장: 장기 실행 워크플로우 관리

시간/일 단위 워크플로우의 듀러블 실행, 체크포인팅, 일시 정지와 재개, 상태 직렬화, 타임아웃 관리, 분산 실행 전략을 정리합니다.

16분764자10개 섹션
workflowaiautomation
공유
agentic-workflow4 / 10
12345678910
이전3장: Human-in-the-Loop 설계다음5장: 에러 복구와 재시도 전략

이 장에서 배울 내용

  • 장기 실행 워크플로우의 특성과 설계 고려사항
  • 듀러블 실행(Durable Execution)의 원리와 구현
  • 체크포인팅을 통한 상태 영속화 전략
  • 일시 정지/재개 메커니즘과 타임아웃 관리
  • 분산 환경에서의 워크플로우 실행

장기 실행 워크플로우의 특성

Agentic Workflow는 단순히 몇 초 만에 완료되는 API 호출과 다릅니다. 실제 비즈니스 프로세스에서는 다음과 같은 이유로 워크플로우가 수 시간에서 수일까지 실행될 수 있습니다.

  • 사람의 승인을 기다리는 대기 시간
  • 외부 시스템의 비동기 처리 완료 대기
  • 배치 처리 윈도우에 맞춘 스케줄링
  • 여러 팀/부서에 걸친 순차적 처리

이러한 장기 실행 워크플로우에서 가장 큰 위험은 중간 상태의 유실입니다. 서버 재시작, 네트워크 장애, 프로세스 크래시가 발생했을 때, 처음부터 다시 시작해야 한다면 비용과 시간 모두 막대한 낭비가 됩니다.

듀러블 실행(Durable Execution)

**듀러블 실행(Durable Execution)**은 워크플로우의 실행 상태를 영속적으로 저장하여, 어떤 장애가 발생해도 정확히 중단된 지점에서 재개할 수 있는 실행 모델입니다.

핵심 원리

듀러블 실행의 핵심은 **이벤트 소싱(Event Sourcing)**입니다. 워크플로우의 모든 단계를 이벤트로 기록하고, 복구 시 이벤트를 재생(replay)하여 정확한 상태를 복원합니다.

Temporal의 듀러블 실행

Temporal은 듀러블 실행을 프레임워크 수준에서 제공합니다. 개발자는 일반적인 코드를 작성하면 되고, Temporal 런타임이 자동으로 실행 상태를 영속화합니다.

temporal_durable.py
python
from temporalio import workflow, activity
from datetime import timedelta
 
@workflow.defn
class OnboardingWorkflow:
    @workflow.run
    async def run(self, application: Application) -> OnboardingResult:
        # 단계 1: 서류 검증 (자동)
        verification = await workflow.execute_activity(
            verify_documents,
            application.documents,
            start_to_close_timeout=timedelta(minutes=5),
        )
 
        if not verification.passed:
            # 추가 서류 요청 후 대기 (최대 72시간)
            await workflow.execute_activity(
                request_additional_docs,
                application.customer_id,
                start_to_close_timeout=timedelta(minutes=1),
            )
 
            # 고객의 서류 제출을 대기 (시그널)
            additional_docs = await workflow.wait_condition(
                lambda: self.additional_docs_received,
                timeout=timedelta(hours=72),
            )
 
            if not additional_docs:
                return OnboardingResult(status="timeout", reason="서류 미제출")
 
        # 단계 2: 심사 승인 대기 (사람)
        approval_result = await workflow.execute_activity(
            request_approval,
            ApprovalRequest(
                customer_id=application.customer_id,
                verification=verification,
            ),
            start_to_close_timeout=timedelta(hours=24),
        )
 
        if not approval_result.approved:
            return OnboardingResult(status="rejected", reason=approval_result.reason)
 
        # 단계 3: 계정 생성 및 알림 (자동)
        account = await workflow.execute_activity(
            create_account,
            application,
            start_to_close_timeout=timedelta(minutes=2),
        )
 
        await workflow.execute_activity(
            send_welcome_email,
            account,
            start_to_close_timeout=timedelta(seconds=30),
        )
 
        return OnboardingResult(status="completed", account_id=account.id)
 
    # 시그널 핸들러: 외부에서 서류 제출 이벤트 수신
    @workflow.signal
    async def on_docs_submitted(self, docs: list[Document]):
        self.additional_docs_received = True
        self.submitted_docs = docs
Info

Temporal 워크플로우에서 workflow.wait_condition이나 시그널 대기 중에 서버가 재시작되어도 상태가 유실되지 않습니다. Temporal 런타임이 이벤트 히스토리를 기반으로 워크플로우를 정확한 대기 지점으로 복원합니다.

LangGraph의 체크포인팅

LangGraph는 그래프의 각 노드 실행 후 상태를 체크포인트로 저장합니다. 다양한 백엔드(SQLite, PostgreSQL, Redis 등)를 체크포인트 저장소로 사용할 수 있습니다.

langgraph_checkpoint.py
python
from langgraph.graph import StateGraph
from langgraph.checkpoint.postgres import PostgresSaver
 
# PostgreSQL 체크포인터 설정
checkpointer = PostgresSaver.from_conn_string(
    "postgresql://user:pass@localhost:5432/workflows"
)
 
# 그래프 컴파일 시 체크포인터 연결
graph = StateGraph(WorkflowState)
# ... 노드와 엣지 추가 ...
workflow = graph.compile(checkpointer=checkpointer)
 
# 실행 시 thread_id로 워크플로우 인스턴스를 식별
config = {"configurable": {"thread_id": "onboarding-12345"}}
 
# 첫 실행
result = await workflow.ainvoke(initial_state, config)
 
# 중단된 워크플로우 재개 (동일한 thread_id 사용)
result = await workflow.ainvoke(
    {"approval_status": "approved"},  # 새로운 입력
    config,
)

상태 직렬화

장기 실행 워크플로우의 상태는 직렬화되어 저장소에 기록됩니다. 상태에 포함되는 데이터의 크기와 복잡성을 관리하는 것이 중요합니다.

직렬화 전략

state_serialization.py
python
from pydantic import BaseModel
from datetime import datetime
import json
 
class WorkflowSnapshot(BaseModel):
    """워크플로우 상태 스냅샷"""
    workflow_id: str
    version: int
    current_node: str
    state_data: dict  # 직렬화 가능한 데이터만
    created_at: datetime
    metadata: dict
 
    class Config:
        json_encoders = {
            datetime: lambda v: v.isoformat(),
        }
 
class StateManager:
    def __init__(self, store: StateStore):
        self.store = store
 
    async def save_checkpoint(
        self,
        workflow_id: str,
        node: str,
        state: dict,
    ) -> None:
        """상태 체크포인트 저장"""
        # 대용량 데이터는 참조로 저장
        processed_state = self._externalize_large_values(state)
 
        snapshot = WorkflowSnapshot(
            workflow_id=workflow_id,
            version=await self.store.next_version(workflow_id),
            current_node=node,
            state_data=processed_state,
            created_at=datetime.utcnow(),
            metadata={"serializer": "json", "compressed": False},
        )
 
        await self.store.save(snapshot)
 
    def _externalize_large_values(self, state: dict) -> dict:
        """100KB 이상의 값은 외부 저장소로 이동"""
        result = {}
        for key, value in state.items():
            serialized = json.dumps(value, default=str)
            if len(serialized) > 100_000:
                ref = self.store.save_blob(serialized)
                result[key] = {"__ref__": ref}
            else:
                result[key] = value
        return result
Warning

워크플로우 상태에 LLM 응답 전문이나 대용량 문서를 직접 저장하면 체크포인트 크기가 급격히 증가합니다. 대용량 데이터는 별도 저장소(S3, Blob Storage)에 저장하고 참조만 상태에 포함하는 것이 좋습니다.

일시 정지와 재개

장기 실행 워크플로우는 다양한 이유로 일시 정지(Pause)되고, 나중에 재개(Resume)됩니다.

일시 정지 트리거

트리거설명예시
사람 승인 대기HITL 게이트에서 승인 필요대규모 환불 승인
외부 이벤트 대기외부 시스템의 콜백 대기결제 확인, 서류 제출
스케줄 기반 대기특정 시각까지 대기업무 시간 내 처리
수동 일시 정지관리자가 의도적으로 중단문제 조사, 정책 변경

재개 메커니즘

pause_resume.py
python
class WorkflowController:
    def __init__(self, runtime: WorkflowRuntime):
        self.runtime = runtime
 
    async def pause(self, workflow_id: str, reason: str) -> None:
        """워크플로우 수동 일시 정지"""
        await self.runtime.signal(
            workflow_id,
            signal_name="pause",
            payload={"reason": reason, "paused_by": get_current_user()},
        )
 
    async def resume(self, workflow_id: str, input_data: dict = None) -> None:
        """워크플로우 재개"""
        await self.runtime.signal(
            workflow_id,
            signal_name="resume",
            payload=input_data or {},
        )
 
    async def get_status(self, workflow_id: str) -> WorkflowStatus:
        """워크플로우 현재 상태 조회"""
        state = await self.runtime.get_state(workflow_id)
        return WorkflowStatus(
            id=workflow_id,
            current_node=state.current_node,
            is_paused=state.is_paused,
            paused_since=state.paused_at,
            waiting_for=state.pending_signals,
            progress=self._calculate_progress(state),
        )

타임아웃 관리

장기 실행 워크플로우에서 타임아웃은 다층적으로 관리해야 합니다.

타임아웃 계층

timeout_management.py
python
from datetime import timedelta
 
@dataclass
class TimeoutConfig:
    """다층 타임아웃 설정"""
    workflow_timeout: timedelta = timedelta(days=7)
    stage_timeouts: dict[str, timedelta] = None
    activity_timeout: timedelta = timedelta(minutes=5)
    llm_call_timeout: timedelta = timedelta(seconds=60)
    api_call_timeout: timedelta = timedelta(seconds=30)
 
    def __post_init__(self):
        if self.stage_timeouts is None:
            self.stage_timeouts = {}
 
    def get_stage_timeout(self, stage: str) -> timedelta:
        return self.stage_timeouts.get(stage, timedelta(hours=4))
 
# 온보딩 워크플로우 타임아웃 설정
onboarding_timeouts = TimeoutConfig(
    workflow_timeout=timedelta(days=14),
    stage_timeouts={
        "document_verification": timedelta(hours=1),
        "human_approval": timedelta(hours=24),
        "customer_document_wait": timedelta(days=5),
        "account_creation": timedelta(minutes=10),
    },
    activity_timeout=timedelta(minutes=5),
)

타임아웃 발생 시 처리

타임아웃이 발생했을 때의 처리 전략도 사전에 정의해야 합니다.

  • 재시도: 일시적 장애로 추정되면 재시도합니다 (개별 액티비티 수준)
  • 에스컬레이션: 사람의 응답이 지연되면 상위 관리자에게 에스컬레이션합니다
  • 대체 경로: 주 경로가 타임아웃되면 대체 경로로 전환합니다
  • 정리 후 종료: 복구 불가능한 경우 보상 트랜잭션을 실행하고 워크플로우를 종료합니다

진행 상태 추적

장기 실행 워크플로우에서 현재 어디까지 진행되었는지 추적하는 것은 운영자와 이해관계자 모두에게 중요합니다.

progress_tracker.py
python
@dataclass
class ProgressUpdate:
    workflow_id: str
    current_stage: str
    total_stages: int
    completed_stages: int
    estimated_completion: datetime | None
    status_message: str
    details: dict
 
class ProgressTracker:
    def __init__(self, notifier: Notifier):
        self.notifier = notifier
 
    async def update(self, progress: ProgressUpdate) -> None:
        """진행 상태 업데이트 및 알림"""
        await self.store.save_progress(progress)
 
        # 대시보드 실시간 업데이트
        await self.notifier.push_update(
            channel=f"workflow:{progress.workflow_id}",
            data=progress,
        )
 
        # 주요 단계 전환 시 이해관계자 알림
        if self._is_milestone(progress):
            await self.notifier.notify_stakeholders(
                workflow_id=progress.workflow_id,
                message=f"[{progress.current_stage}] {progress.status_message}",
            )

분산 실행

대규모 워크플로우는 여러 워커(Worker) 노드에 분산되어 실행됩니다. 이때 워크플로우의 일관성을 보장하는 것이 핵심 과제입니다.

분산 실행 아키텍처

Temporal이나 LangGraph의 서버 모드에서는 태스크 큐를 통해 작업이 분배됩니다. 각 워커는 상태 저장소를 통해 워크플로우 상태를 공유하며, 특정 워커가 실패해도 다른 워커가 작업을 이어받을 수 있습니다.

Tip

분산 환경에서는 **멱등성(Idempotency)**이 특히 중요합니다. 네트워크 파티션이나 워커 재시작으로 인해 동일한 액티비티가 두 번 실행될 수 있으므로, 모든 액티비티는 중복 실행되어도 동일한 결과를 보장해야 합니다.


정리

이 장에서는 장기 실행 워크플로우의 관리 기법을 살펴보았습니다.

  • 듀러블 실행은 이벤트 소싱 기반으로 장애 발생 시 정확한 복구를 보장합니다
  • Temporal은 프레임워크 수준의 듀러블 실행을, LangGraph는 체크포인터 기반 상태 영속화를 제공합니다
  • 대용량 상태 데이터는 외부 저장소에 저장하고 참조만 체크포인트에 포함합니다
  • 타임아웃은 워크플로우, 단계, 액티비티, API 호출 수준으로 다층 관리합니다
  • 분산 환경에서는 멱등성 보장이 핵심입니다

다음 장 예고

5장에서는 워크플로우 실행 중 발생하는 다양한 에러에 대응하는 에러 복구와 재시도 전략을 다룹니다. 에러 분류 체계, 지수 백오프, 서킷 브레이커, 모델 폴백, 보상 트랜잭션(Saga 패턴) 등 실전에서 필수적인 복원력 패턴을 살펴보겠습니다.

이 글이 도움이 되셨나요?

관련 주제 더 보기

#workflow#ai#automation

관련 글

AI / ML

5장: 에러 복구와 재시도 전략

Agentic Workflow의 에러 분류 체계, 지수 백오프, 서킷 브레이커, 모델 폴백, Saga 패턴 기반 보상 트랜잭션, 데드레터 큐 등 복원력 패턴을 다룹니다.

2026년 3월 9일·17분
AI / ML

3장: Human-in-the-Loop 설계

Agentic Workflow의 핵심 안전장치인 HITL과 Human-on-the-Loop 패턴, 승인 게이트, 에스컬레이션 정책, 신뢰도 기반 라우팅, 점진적 자율성 확대 전략을 다룹니다.

2026년 3월 5일·17분
AI / ML

6장: 상태 관리와 체크포인팅

Agentic Workflow의 상태 모델, 이벤트 소싱, 체크포인트 저장소 선택, 멱등성 보장, 상태 복원과 버전 마이그레이션, 분산 상태 일관성 전략을 다룹니다.

2026년 3월 11일·16분
이전 글3장: Human-in-the-Loop 설계
다음 글5장: 에러 복구와 재시도 전략

댓글

목차

약 16분 남음
  • 이 장에서 배울 내용
  • 장기 실행 워크플로우의 특성
  • 듀러블 실행(Durable Execution)
    • 핵심 원리
    • Temporal의 듀러블 실행
    • LangGraph의 체크포인팅
  • 상태 직렬화
    • 직렬화 전략
  • 일시 정지와 재개
    • 일시 정지 트리거
    • 재개 메커니즘
  • 타임아웃 관리
    • 타임아웃 계층
    • 타임아웃 발생 시 처리
  • 진행 상태 추적
  • 분산 실행
    • 분산 실행 아키텍처
  • 정리
  • 다음 장 예고