본문으로 건너뛰기
Kreath Archive
TechProjectsBooksAbout
TechProjectsBooksAbout
TechProjectsBooksAbout
© 2026 Kreath. All rights reserved.
홈TechProjectsBooksAbout
//
  1. 홈
  2. 테크
  3. 5장: 에러 복구와 재시도 전략
2026년 3월 9일·AI / ML·

5장: 에러 복구와 재시도 전략

Agentic Workflow의 에러 분류 체계, 지수 백오프, 서킷 브레이커, 모델 폴백, Saga 패턴 기반 보상 트랜잭션, 데드레터 큐 등 복원력 패턴을 다룹니다.

17분1,180자11개 섹션
workflowaiautomation
공유
agentic-workflow5 / 10
12345678910
이전4장: 장기 실행 워크플로우 관리다음6장: 상태 관리와 체크포인팅

이 장에서 배울 내용

  • 에러 유형별 분류 체계와 대응 전략
  • 지수 백오프와 지터를 활용한 재시도 구현
  • 서킷 브레이커와 모델 폴백 패턴
  • Saga 패턴을 활용한 보상 트랜잭션
  • 데드레터 큐와 부분 실패 처리

에러 분류 체계

Agentic Workflow에서 발생하는 에러는 성격에 따라 적절한 대응 전략이 달라집니다. 에러를 체계적으로 분류하는 것이 복원력 있는 시스템의 첫걸음입니다.

3가지 에러 유형

일시적 에러(Transient Error)는 네트워크 타임아웃, API 속도 제한(Rate Limit), 일시적 서비스 장애 등 시간이 지나면 자연스럽게 해결되는 에러입니다. 재시도가 가장 효과적인 대응입니다.

영구적 에러(Permanent Error)는 잘못된 API 키, 존재하지 않는 리소스, 권한 부족 등 재시도해도 해결되지 않는 에러입니다. 대체 경로나 사람의 개입이 필요합니다.

논리적 에러(Logical Error)는 에이전트의 판단 오류, 잘못된 도구 선택, 유효하지 않은 출력 등 시스템이 아닌 로직 수준의 문제입니다. 입력 검증 강화나 프롬프트 수정이 필요합니다.

error_classifier.py
python
from enum import Enum
 
class ErrorType(Enum):
    TRANSIENT = "transient"
    PERMANENT = "permanent"
    LOGICAL = "logical"
 
class ErrorClassifier:
    """에러를 유형별로 분류"""
 
    TRANSIENT_PATTERNS = [
        "timeout", "rate_limit", "503", "502",
        "connection_reset", "temporary_failure",
    ]
 
    PERMANENT_PATTERNS = [
        "401", "403", "404", "invalid_api_key",
        "resource_not_found", "permission_denied",
    ]
 
    def classify(self, error: Exception) -> ErrorType:
        error_str = str(error).lower()
 
        for pattern in self.TRANSIENT_PATTERNS:
            if pattern in error_str:
                return ErrorType.TRANSIENT
 
        for pattern in self.PERMANENT_PATTERNS:
            if pattern in error_str:
                return ErrorType.PERMANENT
 
        # LLM 출력 검증 실패는 논리적 에러
        if isinstance(error, OutputValidationError):
            return ErrorType.LOGICAL
 
        # 분류 불가능한 경우 일시적으로 간주하고 재시도
        return ErrorType.TRANSIENT

지수 백오프와 지터

일시적 에러에 대한 재시도는 지수 백오프(Exponential Backoff)와 지터(Jitter)를 조합하여 구현합니다. 일정한 간격으로 재시도하면 동시에 많은 요청이 몰리는 썬더링 허드(Thundering Herd) 문제가 발생할 수 있습니다.

구현

retry_with_backoff.py
python
import asyncio
import random
from dataclasses import dataclass
 
@dataclass
class RetryConfig:
    max_attempts: int = 5
    base_delay: float = 1.0     # 초기 대기 시간 (초)
    max_delay: float = 60.0     # 최대 대기 시간 (초)
    exponential_base: float = 2.0
    jitter: bool = True
 
async def retry_with_backoff(
    func,
    config: RetryConfig = RetryConfig(),
    classifier: ErrorClassifier = ErrorClassifier(),
):
    """지수 백오프 + 지터 재시도"""
    last_error = None
 
    for attempt in range(config.max_attempts):
        try:
            return await func()
        except Exception as e:
            last_error = e
            error_type = classifier.classify(e)
 
            # 영구적 에러는 즉시 중단
            if error_type == ErrorType.PERMANENT:
                raise PermanentError(f"영구적 에러, 재시도 불가: {e}") from e
 
            # 논리적 에러도 단순 재시도로 해결 불가
            if error_type == ErrorType.LOGICAL:
                raise LogicalError(f"논리적 에러, 로직 수정 필요: {e}") from e
 
            # 마지막 시도였으면 예외 전파
            if attempt == config.max_attempts - 1:
                raise
 
            # 지수 백오프 계산
            delay = min(
                config.base_delay * (config.exponential_base ** attempt),
                config.max_delay,
            )
 
            # 지터 적용 (0 ~ delay 사이의 랜덤 값)
            if config.jitter:
                delay = random.uniform(0, delay)
 
            logger.warning(
                f"재시도 {attempt + 1}/{config.max_attempts}, "
                f"{delay:.1f}초 후 재시도: {e}"
            )
            await asyncio.sleep(delay)
 
    raise last_error
Info

권장 설정은 기본 대기 시간 1~2초, 지수 기반 2배, 최대 시도 5~7회입니다. 지터를 적용하면 동시 재시도로 인한 부하 집중을 효과적으로 분산할 수 있습니다.

재시도 시 주의사항

재시도 시 반드시 멱등성(Idempotency)을 보장해야 합니다. 예를 들어, 결제 API를 호출한 후 응답을 받지 못해 재시도하는 경우, 멱등성 키(Idempotency Key) 없이 호출하면 이중 결제가 발생할 수 있습니다.

idempotent_tool_call.py
python
import uuid
 
class IdempotentToolExecutor:
    def __init__(self, tool: Tool, result_cache: ResultCache):
        self.tool = tool
        self.cache = result_cache
 
    async def execute(self, params: dict, idempotency_key: str = None) -> ToolResult:
        key = idempotency_key or str(uuid.uuid4())
 
        # 이미 실행된 결과가 있으면 캐시에서 반환
        cached = await self.cache.get(key)
        if cached is not None:
            return cached
 
        # 실행 후 결과를 캐시에 저장
        result = await self.tool.execute(params)
        await self.cache.set(key, result, ttl=timedelta(hours=24))
 
        return result

서킷 브레이커

서킷 브레이커(Circuit Breaker)는 반복적으로 실패하는 외부 서비스에 대한 호출을 일시적으로 차단하여 시스템 전체의 안정성을 보호하는 패턴입니다.

3가지 상태

  • Closed(닫힘): 정상 상태. 모든 요청이 통과합니다. 실패가 누적되면 Open으로 전환합니다.
  • Open(열림): 차단 상태. 모든 요청을 즉시 실패시킵니다. 일정 시간이 지나면 Half-Open으로 전환합니다.
  • Half-Open(반열림): 테스트 상태. 제한된 요청만 통과시켜 서비스 복구 여부를 확인합니다.
circuit_breaker.py
python
import time
from enum import Enum
from dataclasses import dataclass, field
 
class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"
 
@dataclass
class CircuitBreaker:
    failure_threshold: int = 5
    recovery_timeout: float = 30.0  # 초
    half_open_max_calls: int = 3
 
    state: CircuitState = field(default=CircuitState.CLOSED, init=False)
    failure_count: int = field(default=0, init=False)
    last_failure_time: float = field(default=0, init=False)
    half_open_calls: int = field(default=0, init=False)
 
    async def call(self, func, *args, **kwargs):
        if self.state == CircuitState.OPEN:
            if time.time() - self.last_failure_time > self.recovery_timeout:
                self.state = CircuitState.HALF_OPEN
                self.half_open_calls = 0
            else:
                raise CircuitOpenError(
                    f"서킷 브레이커 열림 상태, "
                    f"{self.recovery_timeout - (time.time() - self.last_failure_time):.0f}초 후 재시도"
                )
 
        if self.state == CircuitState.HALF_OPEN:
            if self.half_open_calls >= self.half_open_max_calls:
                raise CircuitOpenError("Half-Open 테스트 한도 초과")
            self.half_open_calls += 1
 
        try:
            result = await func(*args, **kwargs)
            self._on_success()
            return result
        except Exception as e:
            self._on_failure()
            raise
 
    def _on_success(self):
        if self.state == CircuitState.HALF_OPEN:
            self.state = CircuitState.CLOSED
        self.failure_count = 0
 
    def _on_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN

모델 폴백

LLM 호출이 실패하거나 품질이 낮은 경우, 다른 모델로 폴백(Fallback)하는 전략입니다. 비용-성능 트레이드오프를 고려하여 폴백 체인을 구성합니다.

model_fallback.py
python
@dataclass
class ModelConfig:
    name: str
    provider: str
    cost_per_1k_tokens: float
    max_tokens: int
    timeout: float
 
class ModelFallbackChain:
    """모델 폴백 체인"""
 
    def __init__(self, models: list[ModelConfig]):
        self.models = models  # 우선순위 순서
        self.circuit_breakers = {
            m.name: CircuitBreaker(failure_threshold=3, recovery_timeout=60)
            for m in models
        }
 
    async def invoke(self, messages: list[dict]) -> LLMResponse:
        errors = []
 
        for model in self.models:
            breaker = self.circuit_breakers[model.name]
 
            try:
                response = await breaker.call(
                    self._call_model, model, messages
                )
                return response
            except CircuitOpenError:
                logger.info(f"{model.name} 서킷 브레이커 열림, 다음 모델로 폴백")
                continue
            except Exception as e:
                errors.append((model.name, e))
                logger.warning(f"{model.name} 실패: {e}, 다음 모델로 폴백")
                continue
 
        # 모든 모델 실패
        raise AllModelsFailed(errors)
 
# 폴백 체인 구성 예시
fallback_chain = ModelFallbackChain([
    ModelConfig("claude-opus", "anthropic", 0.015, 200000, 60.0),
    ModelConfig("claude-sonnet", "anthropic", 0.003, 200000, 30.0),
    ModelConfig("gpt-4o", "openai", 0.005, 128000, 30.0),
    ModelConfig("claude-haiku", "anthropic", 0.00025, 200000, 15.0),
])
Tip

폴백 체인에서 하위 모델은 성능이 낮을 수 있으므로, 폴백 시 프롬프트를 단순화하거나 few-shot 예시를 추가하는 등 모델 능력에 맞게 조정하는 것이 좋습니다.

보상 트랜잭션 — Saga 패턴

여러 단계로 구성된 워크플로우에서 중간 단계가 실패하면, 이미 완료된 이전 단계의 결과를 되돌려야 할 수 있습니다. Saga 패턴은 각 단계에 대응하는 보상 액션(Compensating Action)을 정의하여 이를 처리합니다.

Saga 구현

saga_pattern.py
python
from dataclasses import dataclass
from typing import Callable, Any
 
@dataclass
class SagaStep:
    name: str
    action: Callable
    compensation: Callable  # 보상 액션
 
class SagaOrchestrator:
    def __init__(self, steps: list[SagaStep]):
        self.steps = steps
        self.completed_steps: list[tuple[SagaStep, Any]] = []
 
    async def execute(self, context: dict) -> dict:
        """Saga 실행: 실패 시 보상 트랜잭션 수행"""
        try:
            for step in self.steps:
                logger.info(f"Saga 단계 실행: {step.name}")
                result = await step.action(context)
                self.completed_steps.append((step, result))
                context[f"{step.name}_result"] = result
 
            return context
 
        except Exception as e:
            logger.error(f"Saga 실패 at {step.name}: {e}")
            await self._compensate(context)
            raise SagaFailure(
                failed_step=step.name,
                error=e,
                compensated_steps=[s.name for s, _ in self.completed_steps],
            )
 
    async def _compensate(self, context: dict) -> None:
        """역순으로 보상 액션 실행"""
        for step, result in reversed(self.completed_steps):
            try:
                logger.info(f"보상 실행: {step.name}")
                await step.compensation(context, result)
            except Exception as comp_error:
                # 보상 실패는 별도 기록 후 수동 처리 큐에 추가
                logger.critical(
                    f"보상 실패 at {step.name}: {comp_error}"
                )
                await dead_letter_queue.enqueue(
                    FailedCompensation(step=step.name, error=comp_error)
                )
 
# 주문 처리 Saga 예시
order_saga = SagaOrchestrator([
    SagaStep(
        name="create_order",
        action=create_order,
        compensation=cancel_order,
    ),
    SagaStep(
        name="process_payment",
        action=process_payment,
        compensation=refund_payment,
    ),
    SagaStep(
        name="deduct_inventory",
        action=deduct_inventory,
        compensation=restore_inventory,
    ),
    SagaStep(
        name="request_shipping",
        action=request_shipping,
        compensation=cancel_shipping,
    ),
])
Warning

보상 액션 자체가 실패할 수 있다는 점을 반드시 고려해야 합니다. 보상 실패는 데드레터 큐에 기록하고 수동으로 처리해야 합니다. 보상 액션도 멱등성을 보장해야 중복 실행 시 문제가 발생하지 않습니다.

부분 실패 처리

병렬로 실행되는 태스크 중 일부만 실패하는 경우, 전체를 실패로 처리할지 부분적으로 결과를 활용할지 결정해야 합니다.

partial_failure.py
python
from enum import Enum
 
class PartialFailureStrategy(Enum):
    FAIL_FAST = "fail_fast"           # 하나라도 실패하면 전체 중단
    BEST_EFFORT = "best_effort"       # 성공한 결과만 사용
    QUORUM = "quorum"                 # N개 이상 성공하면 진행
 
@dataclass
class ParallelExecutor:
    strategy: PartialFailureStrategy
    quorum_count: int = None  # QUORUM 전략일 때 필요한 최소 성공 수
 
    async def execute(self, tasks: list[Task]) -> ParallelResult:
        results = await asyncio.gather(
            *[self._execute_task(t) for t in tasks],
            return_exceptions=True,
        )
 
        successes = [(t, r) for t, r in zip(tasks, results) if not isinstance(r, Exception)]
        failures = [(t, r) for t, r in zip(tasks, results) if isinstance(r, Exception)]
 
        match self.strategy:
            case PartialFailureStrategy.FAIL_FAST:
                if failures:
                    raise PartialFailure(successes=successes, failures=failures)
                return ParallelResult(results=successes)
 
            case PartialFailureStrategy.BEST_EFFORT:
                if not successes:
                    raise AllTasksFailed(failures=failures)
                return ParallelResult(results=successes, partial=bool(failures))
 
            case PartialFailureStrategy.QUORUM:
                if len(successes) >= self.quorum_count:
                    return ParallelResult(results=successes, partial=bool(failures))
                raise QuorumNotMet(
                    required=self.quorum_count,
                    achieved=len(successes),
                )

데드레터 큐

모든 복구 전략이 실패한 메시지는 데드레터 큐(Dead Letter Queue, DLQ)로 이동합니다. DLQ는 처리 불가능한 작업을 안전하게 보관하면서 수동 조사와 처리를 가능하게 합니다.

dead_letter_queue.py
python
@dataclass
class DeadLetterEntry:
    id: str
    original_task: dict
    error_history: list[dict]  # 모든 시도와 에러 기록
    workflow_id: str
    created_at: datetime
    retry_count: int
    status: str  # pending, investigating, resolved, discarded
 
class DeadLetterQueue:
    async def enqueue(self, entry: DeadLetterEntry) -> None:
        """DLQ에 항목 추가 및 알림"""
        await self.store.save(entry)
        await self.alerter.send(
            channel="ops",
            severity="high",
            message=f"DLQ 항목 추가: {entry.original_task.get('name', 'unknown')}",
            context={"workflow_id": entry.workflow_id, "error": entry.error_history[-1]},
        )
 
    async def retry(self, entry_id: str) -> None:
        """DLQ 항목 수동 재시도"""
        entry = await self.store.get(entry_id)
        entry.status = "retrying"
        await self.store.update(entry)
        await self.task_runner.submit(entry.original_task)

복원력 계층 통합

지금까지 다룬 패턴들을 하나의 복원력 계층으로 통합합니다.


정리

이 장에서는 Agentic Workflow의 에러 복구와 재시도 전략을 살펴보았습니다.

  • 에러를 일시적/영구적/논리적으로 분류하여 적절한 대응 전략을 적용합니다
  • 지수 백오프 + 지터로 재시도 간격을 조절하고, 멱등성을 보장합니다
  • 서킷 브레이커로 반복 실패하는 외부 서비스에 대한 부하를 차단합니다
  • 모델 폴백 체인으로 LLM 호출의 가용성을 높입니다
  • Saga 패턴으로 다단계 워크플로우의 일관성을 보장합니다
  • 데드레터 큐로 최종 실패한 작업을 안전하게 보관합니다

다음 장 예고

6장에서는 워크플로우의 상태 관리와 체크포인팅을 심층적으로 다룹니다. 이벤트 소싱, 체크포인트 저장소 선택, 멱등성 보장, 상태 버전 마이그레이션, 분산 상태 일관성 등 상태 관리의 모든 측면을 살펴보겠습니다.

이 글이 도움이 되셨나요?

관련 글

AI / ML

6장: 상태 관리와 체크포인팅

Agentic Workflow의 상태 모델, 이벤트 소싱, 체크포인트 저장소 선택, 멱등성 보장, 상태 복원과 버전 마이그레이션, 분산 상태 일관성 전략을 다룹니다.

2026년 3월 11일·16분
AI / ML

4장: 장기 실행 워크플로우 관리

시간/일 단위 워크플로우의 듀러블 실행, 체크포인팅, 일시 정지와 재개, 상태 직렬화, 타임아웃 관리, 분산 실행 전략을 정리합니다.

2026년 3월 7일·16분
AI / ML

7장: 감사 로깅과 컴플라이언스

에이전트 행동 추적, 불변 감사 로그 설계, 규제 요구사항 대응, 설명 가능성, 재현 가능성, OpenTelemetry 통합, 보존 정책을 다룹니다.

2026년 3월 13일·16분
이전 글4장: 장기 실행 워크플로우 관리
다음 글6장: 상태 관리와 체크포인팅

댓글

목차

약 17분 남음
  • 이 장에서 배울 내용
  • 에러 분류 체계
    • 3가지 에러 유형
  • 지수 백오프와 지터
    • 구현
    • 재시도 시 주의사항
  • 서킷 브레이커
    • 3가지 상태
  • 모델 폴백
  • 보상 트랜잭션 — Saga 패턴
    • Saga 구현
  • 부분 실패 처리
  • 데드레터 큐
  • 복원력 계층 통합
  • 정리
  • 다음 장 예고