Agentic Workflow의 에러 분류 체계, 지수 백오프, 서킷 브레이커, 모델 폴백, Saga 패턴 기반 보상 트랜잭션, 데드레터 큐 등 복원력 패턴을 다룹니다.
Agentic Workflow에서 발생하는 에러는 성격에 따라 적절한 대응 전략이 달라집니다. 에러를 체계적으로 분류하는 것이 복원력 있는 시스템의 첫걸음입니다.
**일시적 에러(Transient Error)**는 네트워크 타임아웃, API 속도 제한(Rate Limit), 일시적 서비스 장애 등 시간이 지나면 자연스럽게 해결되는 에러입니다. 재시도가 가장 효과적인 대응입니다.
**영구적 에러(Permanent Error)**는 잘못된 API 키, 존재하지 않는 리소스, 권한 부족 등 재시도해도 해결되지 않는 에러입니다. 대체 경로나 사람의 개입이 필요합니다.
**논리적 에러(Logical Error)**는 에이전트의 판단 오류, 잘못된 도구 선택, 유효하지 않은 출력 등 시스템이 아닌 로직 수준의 문제입니다. 입력 검증 강화나 프롬프트 수정이 필요합니다.
from enum import Enum
class ErrorType(Enum):
TRANSIENT = "transient"
PERMANENT = "permanent"
LOGICAL = "logical"
class ErrorClassifier:
"""에러를 유형별로 분류"""
TRANSIENT_PATTERNS = [
"timeout", "rate_limit", "503", "502",
"connection_reset", "temporary_failure",
]
PERMANENT_PATTERNS = [
"401", "403", "404", "invalid_api_key",
"resource_not_found", "permission_denied",
]
def classify(self, error: Exception) -> ErrorType:
error_str = str(error).lower()
for pattern in self.TRANSIENT_PATTERNS:
if pattern in error_str:
return ErrorType.TRANSIENT
for pattern in self.PERMANENT_PATTERNS:
if pattern in error_str:
return ErrorType.PERMANENT
# LLM 출력 검증 실패는 논리적 에러
if isinstance(error, OutputValidationError):
return ErrorType.LOGICAL
# 분류 불가능한 경우 일시적으로 간주하고 재시도
return ErrorType.TRANSIENT일시적 에러에 대한 재시도는 **지수 백오프(Exponential Backoff)**와 **지터(Jitter)**를 조합하여 구현합니다. 일정한 간격으로 재시도하면 동시에 많은 요청이 몰리는 썬더링 허드(Thundering Herd) 문제가 발생할 수 있습니다.
import asyncio
import random
from dataclasses import dataclass
@dataclass
class RetryConfig:
max_attempts: int = 5
base_delay: float = 1.0 # 초기 대기 시간 (초)
max_delay: float = 60.0 # 최대 대기 시간 (초)
exponential_base: float = 2.0
jitter: bool = True
async def retry_with_backoff(
func,
config: RetryConfig = RetryConfig(),
classifier: ErrorClassifier = ErrorClassifier(),
):
"""지수 백오프 + 지터 재시도"""
last_error = None
for attempt in range(config.max_attempts):
try:
return await func()
except Exception as e:
last_error = e
error_type = classifier.classify(e)
# 영구적 에러는 즉시 중단
if error_type == ErrorType.PERMANENT:
raise PermanentError(f"영구적 에러, 재시도 불가: {e}") from e
# 논리적 에러도 단순 재시도로 해결 불가
if error_type == ErrorType.LOGICAL:
raise LogicalError(f"논리적 에러, 로직 수정 필요: {e}") from e
# 마지막 시도였으면 예외 전파
if attempt == config.max_attempts - 1:
raise
# 지수 백오프 계산
delay = min(
config.base_delay * (config.exponential_base ** attempt),
config.max_delay,
)
# 지터 적용 (0 ~ delay 사이의 랜덤 값)
if config.jitter:
delay = random.uniform(0, delay)
logger.warning(
f"재시도 {attempt + 1}/{config.max_attempts}, "
f"{delay:.1f}초 후 재시도: {e}"
)
await asyncio.sleep(delay)
raise last_error권장 설정은 기본 대기 시간 1~2초, 지수 기반 2배, 최대 시도 5~7회입니다. 지터를 적용하면 동시 재시도로 인한 부하 집중을 효과적으로 분산할 수 있습니다.
재시도 시 반드시 **멱등성(Idempotency)**을 보장해야 합니다. 예를 들어, 결제 API를 호출한 후 응답을 받지 못해 재시도하는 경우, 멱등성 키(Idempotency Key) 없이 호출하면 이중 결제가 발생할 수 있습니다.
import uuid
class IdempotentToolExecutor:
def __init__(self, tool: Tool, result_cache: ResultCache):
self.tool = tool
self.cache = result_cache
async def execute(self, params: dict, idempotency_key: str = None) -> ToolResult:
key = idempotency_key or str(uuid.uuid4())
# 이미 실행된 결과가 있으면 캐시에서 반환
cached = await self.cache.get(key)
if cached is not None:
return cached
# 실행 후 결과를 캐시에 저장
result = await self.tool.execute(params)
await self.cache.set(key, result, ttl=timedelta(hours=24))
return result**서킷 브레이커(Circuit Breaker)**는 반복적으로 실패하는 외부 서비스에 대한 호출을 일시적으로 차단하여 시스템 전체의 안정성을 보호하는 패턴입니다.
import time
from enum import Enum
from dataclasses import dataclass, field
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
@dataclass
class CircuitBreaker:
failure_threshold: int = 5
recovery_timeout: float = 30.0 # 초
half_open_max_calls: int = 3
state: CircuitState = field(default=CircuitState.CLOSED, init=False)
failure_count: int = field(default=0, init=False)
last_failure_time: float = field(default=0, init=False)
half_open_calls: int = field(default=0, init=False)
async def call(self, func, *args, **kwargs):
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
self.half_open_calls = 0
else:
raise CircuitOpenError(
f"서킷 브레이커 열림 상태, "
f"{self.recovery_timeout - (time.time() - self.last_failure_time):.0f}초 후 재시도"
)
if self.state == CircuitState.HALF_OPEN:
if self.half_open_calls >= self.half_open_max_calls:
raise CircuitOpenError("Half-Open 테스트 한도 초과")
self.half_open_calls += 1
try:
result = await func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _on_success(self):
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.CLOSED
self.failure_count = 0
def _on_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPENLLM 호출이 실패하거나 품질이 낮은 경우, 다른 모델로 **폴백(Fallback)**하는 전략입니다. 비용-성능 트레이드오프를 고려하여 폴백 체인을 구성합니다.
@dataclass
class ModelConfig:
name: str
provider: str
cost_per_1k_tokens: float
max_tokens: int
timeout: float
class ModelFallbackChain:
"""모델 폴백 체인"""
def __init__(self, models: list[ModelConfig]):
self.models = models # 우선순위 순서
self.circuit_breakers = {
m.name: CircuitBreaker(failure_threshold=3, recovery_timeout=60)
for m in models
}
async def invoke(self, messages: list[dict]) -> LLMResponse:
errors = []
for model in self.models:
breaker = self.circuit_breakers[model.name]
try:
response = await breaker.call(
self._call_model, model, messages
)
return response
except CircuitOpenError:
logger.info(f"{model.name} 서킷 브레이커 열림, 다음 모델로 폴백")
continue
except Exception as e:
errors.append((model.name, e))
logger.warning(f"{model.name} 실패: {e}, 다음 모델로 폴백")
continue
# 모든 모델 실패
raise AllModelsFailed(errors)
# 폴백 체인 구성 예시
fallback_chain = ModelFallbackChain([
ModelConfig("claude-opus", "anthropic", 0.015, 200000, 60.0),
ModelConfig("claude-sonnet", "anthropic", 0.003, 200000, 30.0),
ModelConfig("gpt-4o", "openai", 0.005, 128000, 30.0),
ModelConfig("claude-haiku", "anthropic", 0.00025, 200000, 15.0),
])폴백 체인에서 하위 모델은 성능이 낮을 수 있으므로, 폴백 시 프롬프트를 단순화하거나 few-shot 예시를 추가하는 등 모델 능력에 맞게 조정하는 것이 좋습니다.
여러 단계로 구성된 워크플로우에서 중간 단계가 실패하면, 이미 완료된 이전 단계의 결과를 되돌려야 할 수 있습니다. Saga 패턴은 각 단계에 대응하는 **보상 액션(Compensating Action)**을 정의하여 이를 처리합니다.
from dataclasses import dataclass
from typing import Callable, Any
@dataclass
class SagaStep:
name: str
action: Callable
compensation: Callable # 보상 액션
class SagaOrchestrator:
def __init__(self, steps: list[SagaStep]):
self.steps = steps
self.completed_steps: list[tuple[SagaStep, Any]] = []
async def execute(self, context: dict) -> dict:
"""Saga 실행: 실패 시 보상 트랜잭션 수행"""
try:
for step in self.steps:
logger.info(f"Saga 단계 실행: {step.name}")
result = await step.action(context)
self.completed_steps.append((step, result))
context[f"{step.name}_result"] = result
return context
except Exception as e:
logger.error(f"Saga 실패 at {step.name}: {e}")
await self._compensate(context)
raise SagaFailure(
failed_step=step.name,
error=e,
compensated_steps=[s.name for s, _ in self.completed_steps],
)
async def _compensate(self, context: dict) -> None:
"""역순으로 보상 액션 실행"""
for step, result in reversed(self.completed_steps):
try:
logger.info(f"보상 실행: {step.name}")
await step.compensation(context, result)
except Exception as comp_error:
# 보상 실패는 별도 기록 후 수동 처리 큐에 추가
logger.critical(
f"보상 실패 at {step.name}: {comp_error}"
)
await dead_letter_queue.enqueue(
FailedCompensation(step=step.name, error=comp_error)
)
# 주문 처리 Saga 예시
order_saga = SagaOrchestrator([
SagaStep(
name="create_order",
action=create_order,
compensation=cancel_order,
),
SagaStep(
name="process_payment",
action=process_payment,
compensation=refund_payment,
),
SagaStep(
name="deduct_inventory",
action=deduct_inventory,
compensation=restore_inventory,
),
SagaStep(
name="request_shipping",
action=request_shipping,
compensation=cancel_shipping,
),
])보상 액션 자체가 실패할 수 있다는 점을 반드시 고려해야 합니다. 보상 실패는 데드레터 큐에 기록하고 수동으로 처리해야 합니다. 보상 액션도 멱등성을 보장해야 중복 실행 시 문제가 발생하지 않습니다.
병렬로 실행되는 태스크 중 일부만 실패하는 경우, 전체를 실패로 처리할지 부분적으로 결과를 활용할지 결정해야 합니다.
from enum import Enum
class PartialFailureStrategy(Enum):
FAIL_FAST = "fail_fast" # 하나라도 실패하면 전체 중단
BEST_EFFORT = "best_effort" # 성공한 결과만 사용
QUORUM = "quorum" # N개 이상 성공하면 진행
@dataclass
class ParallelExecutor:
strategy: PartialFailureStrategy
quorum_count: int = None # QUORUM 전략일 때 필요한 최소 성공 수
async def execute(self, tasks: list[Task]) -> ParallelResult:
results = await asyncio.gather(
*[self._execute_task(t) for t in tasks],
return_exceptions=True,
)
successes = [(t, r) for t, r in zip(tasks, results) if not isinstance(r, Exception)]
failures = [(t, r) for t, r in zip(tasks, results) if isinstance(r, Exception)]
match self.strategy:
case PartialFailureStrategy.FAIL_FAST:
if failures:
raise PartialFailure(successes=successes, failures=failures)
return ParallelResult(results=successes)
case PartialFailureStrategy.BEST_EFFORT:
if not successes:
raise AllTasksFailed(failures=failures)
return ParallelResult(results=successes, partial=bool(failures))
case PartialFailureStrategy.QUORUM:
if len(successes) >= self.quorum_count:
return ParallelResult(results=successes, partial=bool(failures))
raise QuorumNotMet(
required=self.quorum_count,
achieved=len(successes),
)모든 복구 전략이 실패한 메시지는 **데드레터 큐(Dead Letter Queue, DLQ)**로 이동합니다. DLQ는 처리 불가능한 작업을 안전하게 보관하면서 수동 조사와 처리를 가능하게 합니다.
@dataclass
class DeadLetterEntry:
id: str
original_task: dict
error_history: list[dict] # 모든 시도와 에러 기록
workflow_id: str
created_at: datetime
retry_count: int
status: str # pending, investigating, resolved, discarded
class DeadLetterQueue:
async def enqueue(self, entry: DeadLetterEntry) -> None:
"""DLQ에 항목 추가 및 알림"""
await self.store.save(entry)
await self.alerter.send(
channel="ops",
severity="high",
message=f"DLQ 항목 추가: {entry.original_task.get('name', 'unknown')}",
context={"workflow_id": entry.workflow_id, "error": entry.error_history[-1]},
)
async def retry(self, entry_id: str) -> None:
"""DLQ 항목 수동 재시도"""
entry = await self.store.get(entry_id)
entry.status = "retrying"
await self.store.update(entry)
await self.task_runner.submit(entry.original_task)지금까지 다룬 패턴들을 하나의 복원력 계층으로 통합합니다.
이 장에서는 Agentic Workflow의 에러 복구와 재시도 전략을 살펴보았습니다.
6장에서는 워크플로우의 상태 관리와 체크포인팅을 심층적으로 다룹니다. 이벤트 소싱, 체크포인트 저장소 선택, 멱등성 보장, 상태 버전 마이그레이션, 분산 상태 일관성 등 상태 관리의 모든 측면을 살펴보겠습니다.
이 글이 도움이 되셨나요?
관련 주제 더 보기
Agentic Workflow의 상태 모델, 이벤트 소싱, 체크포인트 저장소 선택, 멱등성 보장, 상태 복원과 버전 마이그레이션, 분산 상태 일관성 전략을 다룹니다.
시간/일 단위 워크플로우의 듀러블 실행, 체크포인팅, 일시 정지와 재개, 상태 직렬화, 타임아웃 관리, 분산 실행 전략을 정리합니다.
에이전트 행동 추적, 불변 감사 로그 설계, 규제 요구사항 대응, 설명 가능성, 재현 가능성, OpenTelemetry 통합, 보존 정책을 다룹니다.