시간/일 단위 워크플로우의 듀러블 실행, 체크포인팅, 일시 정지와 재개, 상태 직렬화, 타임아웃 관리, 분산 실행 전략을 정리합니다.
Agentic Workflow는 단순히 몇 초 만에 완료되는 API 호출과 다릅니다. 실제 비즈니스 프로세스에서는 다음과 같은 이유로 워크플로우가 수 시간에서 수일까지 실행될 수 있습니다.
이러한 장기 실행 워크플로우에서 가장 큰 위험은 중간 상태의 유실입니다. 서버 재시작, 네트워크 장애, 프로세스 크래시가 발생했을 때, 처음부터 다시 시작해야 한다면 비용과 시간 모두 막대한 낭비가 됩니다.
**듀러블 실행(Durable Execution)**은 워크플로우의 실행 상태를 영속적으로 저장하여, 어떤 장애가 발생해도 정확히 중단된 지점에서 재개할 수 있는 실행 모델입니다.
듀러블 실행의 핵심은 **이벤트 소싱(Event Sourcing)**입니다. 워크플로우의 모든 단계를 이벤트로 기록하고, 복구 시 이벤트를 재생(replay)하여 정확한 상태를 복원합니다.
Temporal은 듀러블 실행을 프레임워크 수준에서 제공합니다. 개발자는 일반적인 코드를 작성하면 되고, Temporal 런타임이 자동으로 실행 상태를 영속화합니다.
from temporalio import workflow, activity
from datetime import timedelta
@workflow.defn
class OnboardingWorkflow:
@workflow.run
async def run(self, application: Application) -> OnboardingResult:
# 단계 1: 서류 검증 (자동)
verification = await workflow.execute_activity(
verify_documents,
application.documents,
start_to_close_timeout=timedelta(minutes=5),
)
if not verification.passed:
# 추가 서류 요청 후 대기 (최대 72시간)
await workflow.execute_activity(
request_additional_docs,
application.customer_id,
start_to_close_timeout=timedelta(minutes=1),
)
# 고객의 서류 제출을 대기 (시그널)
additional_docs = await workflow.wait_condition(
lambda: self.additional_docs_received,
timeout=timedelta(hours=72),
)
if not additional_docs:
return OnboardingResult(status="timeout", reason="서류 미제출")
# 단계 2: 심사 승인 대기 (사람)
approval_result = await workflow.execute_activity(
request_approval,
ApprovalRequest(
customer_id=application.customer_id,
verification=verification,
),
start_to_close_timeout=timedelta(hours=24),
)
if not approval_result.approved:
return OnboardingResult(status="rejected", reason=approval_result.reason)
# 단계 3: 계정 생성 및 알림 (자동)
account = await workflow.execute_activity(
create_account,
application,
start_to_close_timeout=timedelta(minutes=2),
)
await workflow.execute_activity(
send_welcome_email,
account,
start_to_close_timeout=timedelta(seconds=30),
)
return OnboardingResult(status="completed", account_id=account.id)
# 시그널 핸들러: 외부에서 서류 제출 이벤트 수신
@workflow.signal
async def on_docs_submitted(self, docs: list[Document]):
self.additional_docs_received = True
self.submitted_docs = docsTemporal 워크플로우에서 workflow.wait_condition이나 시그널 대기 중에 서버가 재시작되어도 상태가 유실되지 않습니다. Temporal 런타임이 이벤트 히스토리를 기반으로 워크플로우를 정확한 대기 지점으로 복원합니다.
LangGraph는 그래프의 각 노드 실행 후 상태를 체크포인트로 저장합니다. 다양한 백엔드(SQLite, PostgreSQL, Redis 등)를 체크포인트 저장소로 사용할 수 있습니다.
from langgraph.graph import StateGraph
from langgraph.checkpoint.postgres import PostgresSaver
# PostgreSQL 체크포인터 설정
checkpointer = PostgresSaver.from_conn_string(
"postgresql://user:pass@localhost:5432/workflows"
)
# 그래프 컴파일 시 체크포인터 연결
graph = StateGraph(WorkflowState)
# ... 노드와 엣지 추가 ...
workflow = graph.compile(checkpointer=checkpointer)
# 실행 시 thread_id로 워크플로우 인스턴스를 식별
config = {"configurable": {"thread_id": "onboarding-12345"}}
# 첫 실행
result = await workflow.ainvoke(initial_state, config)
# 중단된 워크플로우 재개 (동일한 thread_id 사용)
result = await workflow.ainvoke(
{"approval_status": "approved"}, # 새로운 입력
config,
)장기 실행 워크플로우의 상태는 직렬화되어 저장소에 기록됩니다. 상태에 포함되는 데이터의 크기와 복잡성을 관리하는 것이 중요합니다.
from pydantic import BaseModel
from datetime import datetime
import json
class WorkflowSnapshot(BaseModel):
"""워크플로우 상태 스냅샷"""
workflow_id: str
version: int
current_node: str
state_data: dict # 직렬화 가능한 데이터만
created_at: datetime
metadata: dict
class Config:
json_encoders = {
datetime: lambda v: v.isoformat(),
}
class StateManager:
def __init__(self, store: StateStore):
self.store = store
async def save_checkpoint(
self,
workflow_id: str,
node: str,
state: dict,
) -> None:
"""상태 체크포인트 저장"""
# 대용량 데이터는 참조로 저장
processed_state = self._externalize_large_values(state)
snapshot = WorkflowSnapshot(
workflow_id=workflow_id,
version=await self.store.next_version(workflow_id),
current_node=node,
state_data=processed_state,
created_at=datetime.utcnow(),
metadata={"serializer": "json", "compressed": False},
)
await self.store.save(snapshot)
def _externalize_large_values(self, state: dict) -> dict:
"""100KB 이상의 값은 외부 저장소로 이동"""
result = {}
for key, value in state.items():
serialized = json.dumps(value, default=str)
if len(serialized) > 100_000:
ref = self.store.save_blob(serialized)
result[key] = {"__ref__": ref}
else:
result[key] = value
return result워크플로우 상태에 LLM 응답 전문이나 대용량 문서를 직접 저장하면 체크포인트 크기가 급격히 증가합니다. 대용량 데이터는 별도 저장소(S3, Blob Storage)에 저장하고 참조만 상태에 포함하는 것이 좋습니다.
장기 실행 워크플로우는 다양한 이유로 일시 정지(Pause)되고, 나중에 재개(Resume)됩니다.
| 트리거 | 설명 | 예시 |
|---|---|---|
| 사람 승인 대기 | HITL 게이트에서 승인 필요 | 대규모 환불 승인 |
| 외부 이벤트 대기 | 외부 시스템의 콜백 대기 | 결제 확인, 서류 제출 |
| 스케줄 기반 대기 | 특정 시각까지 대기 | 업무 시간 내 처리 |
| 수동 일시 정지 | 관리자가 의도적으로 중단 | 문제 조사, 정책 변경 |
class WorkflowController:
def __init__(self, runtime: WorkflowRuntime):
self.runtime = runtime
async def pause(self, workflow_id: str, reason: str) -> None:
"""워크플로우 수동 일시 정지"""
await self.runtime.signal(
workflow_id,
signal_name="pause",
payload={"reason": reason, "paused_by": get_current_user()},
)
async def resume(self, workflow_id: str, input_data: dict = None) -> None:
"""워크플로우 재개"""
await self.runtime.signal(
workflow_id,
signal_name="resume",
payload=input_data or {},
)
async def get_status(self, workflow_id: str) -> WorkflowStatus:
"""워크플로우 현재 상태 조회"""
state = await self.runtime.get_state(workflow_id)
return WorkflowStatus(
id=workflow_id,
current_node=state.current_node,
is_paused=state.is_paused,
paused_since=state.paused_at,
waiting_for=state.pending_signals,
progress=self._calculate_progress(state),
)장기 실행 워크플로우에서 타임아웃은 다층적으로 관리해야 합니다.
from datetime import timedelta
@dataclass
class TimeoutConfig:
"""다층 타임아웃 설정"""
workflow_timeout: timedelta = timedelta(days=7)
stage_timeouts: dict[str, timedelta] = None
activity_timeout: timedelta = timedelta(minutes=5)
llm_call_timeout: timedelta = timedelta(seconds=60)
api_call_timeout: timedelta = timedelta(seconds=30)
def __post_init__(self):
if self.stage_timeouts is None:
self.stage_timeouts = {}
def get_stage_timeout(self, stage: str) -> timedelta:
return self.stage_timeouts.get(stage, timedelta(hours=4))
# 온보딩 워크플로우 타임아웃 설정
onboarding_timeouts = TimeoutConfig(
workflow_timeout=timedelta(days=14),
stage_timeouts={
"document_verification": timedelta(hours=1),
"human_approval": timedelta(hours=24),
"customer_document_wait": timedelta(days=5),
"account_creation": timedelta(minutes=10),
},
activity_timeout=timedelta(minutes=5),
)타임아웃이 발생했을 때의 처리 전략도 사전에 정의해야 합니다.
장기 실행 워크플로우에서 현재 어디까지 진행되었는지 추적하는 것은 운영자와 이해관계자 모두에게 중요합니다.
@dataclass
class ProgressUpdate:
workflow_id: str
current_stage: str
total_stages: int
completed_stages: int
estimated_completion: datetime | None
status_message: str
details: dict
class ProgressTracker:
def __init__(self, notifier: Notifier):
self.notifier = notifier
async def update(self, progress: ProgressUpdate) -> None:
"""진행 상태 업데이트 및 알림"""
await self.store.save_progress(progress)
# 대시보드 실시간 업데이트
await self.notifier.push_update(
channel=f"workflow:{progress.workflow_id}",
data=progress,
)
# 주요 단계 전환 시 이해관계자 알림
if self._is_milestone(progress):
await self.notifier.notify_stakeholders(
workflow_id=progress.workflow_id,
message=f"[{progress.current_stage}] {progress.status_message}",
)대규모 워크플로우는 여러 워커(Worker) 노드에 분산되어 실행됩니다. 이때 워크플로우의 일관성을 보장하는 것이 핵심 과제입니다.
Temporal이나 LangGraph의 서버 모드에서는 태스크 큐를 통해 작업이 분배됩니다. 각 워커는 상태 저장소를 통해 워크플로우 상태를 공유하며, 특정 워커가 실패해도 다른 워커가 작업을 이어받을 수 있습니다.
분산 환경에서는 **멱등성(Idempotency)**이 특히 중요합니다. 네트워크 파티션이나 워커 재시작으로 인해 동일한 액티비티가 두 번 실행될 수 있으므로, 모든 액티비티는 중복 실행되어도 동일한 결과를 보장해야 합니다.
이 장에서는 장기 실행 워크플로우의 관리 기법을 살펴보았습니다.
5장에서는 워크플로우 실행 중 발생하는 다양한 에러에 대응하는 에러 복구와 재시도 전략을 다룹니다. 에러 분류 체계, 지수 백오프, 서킷 브레이커, 모델 폴백, 보상 트랜잭션(Saga 패턴) 등 실전에서 필수적인 복원력 패턴을 살펴보겠습니다.
이 글이 도움이 되셨나요?
관련 주제 더 보기
Agentic Workflow의 에러 분류 체계, 지수 백오프, 서킷 브레이커, 모델 폴백, Saga 패턴 기반 보상 트랜잭션, 데드레터 큐 등 복원력 패턴을 다룹니다.
Agentic Workflow의 핵심 안전장치인 HITL과 Human-on-the-Loop 패턴, 승인 게이트, 에스컬레이션 정책, 신뢰도 기반 라우팅, 점진적 자율성 확대 전략을 다룹니다.
Agentic Workflow의 상태 모델, 이벤트 소싱, 체크포인트 저장소 선택, 멱등성 보장, 상태 복원과 버전 마이그레이션, 분산 상태 일관성 전략을 다룹니다.