본문으로 건너뛰기
Kreath Archive
TechProjectsBooksAbout
TechProjectsBooksAbout

내비게이션

  • Tech
  • Projects
  • Books
  • About
  • Tags

카테고리

  • AI / ML
  • 웹 개발
  • 프로그래밍
  • 개발 도구

연결

  • GitHub
  • Email
  • RSS
© 2026 Kreath Archive. All rights reserved.Built with Next.js + MDX
홈TechProjectsBooksAbout
//
  1. 홈
  2. 테크
  3. 5장: 에러 복구와 재시도 전략
2026년 3월 9일·AI / ML·

5장: 에러 복구와 재시도 전략

Agentic Workflow의 에러 분류 체계, 지수 백오프, 서킷 브레이커, 모델 폴백, Saga 패턴 기반 보상 트랜잭션, 데드레터 큐 등 복원력 패턴을 다룹니다.

17분1,180자11개 섹션
workflowaiautomation
공유
agentic-workflow5 / 10
12345678910
이전4장: 장기 실행 워크플로우 관리다음6장: 상태 관리와 체크포인팅

이 장에서 배울 내용

  • 에러 유형별 분류 체계와 대응 전략
  • 지수 백오프와 지터를 활용한 재시도 구현
  • 서킷 브레이커와 모델 폴백 패턴
  • Saga 패턴을 활용한 보상 트랜잭션
  • 데드레터 큐와 부분 실패 처리

에러 분류 체계

Agentic Workflow에서 발생하는 에러는 성격에 따라 적절한 대응 전략이 달라집니다. 에러를 체계적으로 분류하는 것이 복원력 있는 시스템의 첫걸음입니다.

3가지 에러 유형

**일시적 에러(Transient Error)**는 네트워크 타임아웃, API 속도 제한(Rate Limit), 일시적 서비스 장애 등 시간이 지나면 자연스럽게 해결되는 에러입니다. 재시도가 가장 효과적인 대응입니다.

**영구적 에러(Permanent Error)**는 잘못된 API 키, 존재하지 않는 리소스, 권한 부족 등 재시도해도 해결되지 않는 에러입니다. 대체 경로나 사람의 개입이 필요합니다.

**논리적 에러(Logical Error)**는 에이전트의 판단 오류, 잘못된 도구 선택, 유효하지 않은 출력 등 시스템이 아닌 로직 수준의 문제입니다. 입력 검증 강화나 프롬프트 수정이 필요합니다.

error_classifier.py
python
from enum import Enum
 
class ErrorType(Enum):
    TRANSIENT = "transient"
    PERMANENT = "permanent"
    LOGICAL = "logical"
 
class ErrorClassifier:
    """에러를 유형별로 분류"""
 
    TRANSIENT_PATTERNS = [
        "timeout", "rate_limit", "503", "502",
        "connection_reset", "temporary_failure",
    ]
 
    PERMANENT_PATTERNS = [
        "401", "403", "404", "invalid_api_key",
        "resource_not_found", "permission_denied",
    ]
 
    def classify(self, error: Exception) -> ErrorType:
        error_str = str(error).lower()
 
        for pattern in self.TRANSIENT_PATTERNS:
            if pattern in error_str:
                return ErrorType.TRANSIENT
 
        for pattern in self.PERMANENT_PATTERNS:
            if pattern in error_str:
                return ErrorType.PERMANENT
 
        # LLM 출력 검증 실패는 논리적 에러
        if isinstance(error, OutputValidationError):
            return ErrorType.LOGICAL
 
        # 분류 불가능한 경우 일시적으로 간주하고 재시도
        return ErrorType.TRANSIENT

지수 백오프와 지터

일시적 에러에 대한 재시도는 **지수 백오프(Exponential Backoff)**와 **지터(Jitter)**를 조합하여 구현합니다. 일정한 간격으로 재시도하면 동시에 많은 요청이 몰리는 썬더링 허드(Thundering Herd) 문제가 발생할 수 있습니다.

구현

retry_with_backoff.py
python
import asyncio
import random
from dataclasses import dataclass
 
@dataclass
class RetryConfig:
    max_attempts: int = 5
    base_delay: float = 1.0     # 초기 대기 시간 (초)
    max_delay: float = 60.0     # 최대 대기 시간 (초)
    exponential_base: float = 2.0
    jitter: bool = True
 
async def retry_with_backoff(
    func,
    config: RetryConfig = RetryConfig(),
    classifier: ErrorClassifier = ErrorClassifier(),
):
    """지수 백오프 + 지터 재시도"""
    last_error = None
 
    for attempt in range(config.max_attempts):
        try:
            return await func()
        except Exception as e:
            last_error = e
            error_type = classifier.classify(e)
 
            # 영구적 에러는 즉시 중단
            if error_type == ErrorType.PERMANENT:
                raise PermanentError(f"영구적 에러, 재시도 불가: {e}") from e
 
            # 논리적 에러도 단순 재시도로 해결 불가
            if error_type == ErrorType.LOGICAL:
                raise LogicalError(f"논리적 에러, 로직 수정 필요: {e}") from e
 
            # 마지막 시도였으면 예외 전파
            if attempt == config.max_attempts - 1:
                raise
 
            # 지수 백오프 계산
            delay = min(
                config.base_delay * (config.exponential_base ** attempt),
                config.max_delay,
            )
 
            # 지터 적용 (0 ~ delay 사이의 랜덤 값)
            if config.jitter:
                delay = random.uniform(0, delay)
 
            logger.warning(
                f"재시도 {attempt + 1}/{config.max_attempts}, "
                f"{delay:.1f}초 후 재시도: {e}"
            )
            await asyncio.sleep(delay)
 
    raise last_error
Info

권장 설정은 기본 대기 시간 1~2초, 지수 기반 2배, 최대 시도 5~7회입니다. 지터를 적용하면 동시 재시도로 인한 부하 집중을 효과적으로 분산할 수 있습니다.

재시도 시 주의사항

재시도 시 반드시 **멱등성(Idempotency)**을 보장해야 합니다. 예를 들어, 결제 API를 호출한 후 응답을 받지 못해 재시도하는 경우, 멱등성 키(Idempotency Key) 없이 호출하면 이중 결제가 발생할 수 있습니다.

idempotent_tool_call.py
python
import uuid
 
class IdempotentToolExecutor:
    def __init__(self, tool: Tool, result_cache: ResultCache):
        self.tool = tool
        self.cache = result_cache
 
    async def execute(self, params: dict, idempotency_key: str = None) -> ToolResult:
        key = idempotency_key or str(uuid.uuid4())
 
        # 이미 실행된 결과가 있으면 캐시에서 반환
        cached = await self.cache.get(key)
        if cached is not None:
            return cached
 
        # 실행 후 결과를 캐시에 저장
        result = await self.tool.execute(params)
        await self.cache.set(key, result, ttl=timedelta(hours=24))
 
        return result

서킷 브레이커

**서킷 브레이커(Circuit Breaker)**는 반복적으로 실패하는 외부 서비스에 대한 호출을 일시적으로 차단하여 시스템 전체의 안정성을 보호하는 패턴입니다.

3가지 상태

  • Closed(닫힘): 정상 상태. 모든 요청이 통과합니다. 실패가 누적되면 Open으로 전환합니다.
  • Open(열림): 차단 상태. 모든 요청을 즉시 실패시킵니다. 일정 시간이 지나면 Half-Open으로 전환합니다.
  • Half-Open(반열림): 테스트 상태. 제한된 요청만 통과시켜 서비스 복구 여부를 확인합니다.
circuit_breaker.py
python
import time
from enum import Enum
from dataclasses import dataclass, field
 
class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"
 
@dataclass
class CircuitBreaker:
    failure_threshold: int = 5
    recovery_timeout: float = 30.0  # 초
    half_open_max_calls: int = 3
 
    state: CircuitState = field(default=CircuitState.CLOSED, init=False)
    failure_count: int = field(default=0, init=False)
    last_failure_time: float = field(default=0, init=False)
    half_open_calls: int = field(default=0, init=False)
 
    async def call(self, func, *args, **kwargs):
        if self.state == CircuitState.OPEN:
            if time.time() - self.last_failure_time > self.recovery_timeout:
                self.state = CircuitState.HALF_OPEN
                self.half_open_calls = 0
            else:
                raise CircuitOpenError(
                    f"서킷 브레이커 열림 상태, "
                    f"{self.recovery_timeout - (time.time() - self.last_failure_time):.0f}초 후 재시도"
                )
 
        if self.state == CircuitState.HALF_OPEN:
            if self.half_open_calls >= self.half_open_max_calls:
                raise CircuitOpenError("Half-Open 테스트 한도 초과")
            self.half_open_calls += 1
 
        try:
            result = await func(*args, **kwargs)
            self._on_success()
            return result
        except Exception as e:
            self._on_failure()
            raise
 
    def _on_success(self):
        if self.state == CircuitState.HALF_OPEN:
            self.state = CircuitState.CLOSED
        self.failure_count = 0
 
    def _on_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN

모델 폴백

LLM 호출이 실패하거나 품질이 낮은 경우, 다른 모델로 **폴백(Fallback)**하는 전략입니다. 비용-성능 트레이드오프를 고려하여 폴백 체인을 구성합니다.

model_fallback.py
python
@dataclass
class ModelConfig:
    name: str
    provider: str
    cost_per_1k_tokens: float
    max_tokens: int
    timeout: float
 
class ModelFallbackChain:
    """모델 폴백 체인"""
 
    def __init__(self, models: list[ModelConfig]):
        self.models = models  # 우선순위 순서
        self.circuit_breakers = {
            m.name: CircuitBreaker(failure_threshold=3, recovery_timeout=60)
            for m in models
        }
 
    async def invoke(self, messages: list[dict]) -> LLMResponse:
        errors = []
 
        for model in self.models:
            breaker = self.circuit_breakers[model.name]
 
            try:
                response = await breaker.call(
                    self._call_model, model, messages
                )
                return response
            except CircuitOpenError:
                logger.info(f"{model.name} 서킷 브레이커 열림, 다음 모델로 폴백")
                continue
            except Exception as e:
                errors.append((model.name, e))
                logger.warning(f"{model.name} 실패: {e}, 다음 모델로 폴백")
                continue
 
        # 모든 모델 실패
        raise AllModelsFailed(errors)
 
# 폴백 체인 구성 예시
fallback_chain = ModelFallbackChain([
    ModelConfig("claude-opus", "anthropic", 0.015, 200000, 60.0),
    ModelConfig("claude-sonnet", "anthropic", 0.003, 200000, 30.0),
    ModelConfig("gpt-4o", "openai", 0.005, 128000, 30.0),
    ModelConfig("claude-haiku", "anthropic", 0.00025, 200000, 15.0),
])
Tip

폴백 체인에서 하위 모델은 성능이 낮을 수 있으므로, 폴백 시 프롬프트를 단순화하거나 few-shot 예시를 추가하는 등 모델 능력에 맞게 조정하는 것이 좋습니다.

보상 트랜잭션 — Saga 패턴

여러 단계로 구성된 워크플로우에서 중간 단계가 실패하면, 이미 완료된 이전 단계의 결과를 되돌려야 할 수 있습니다. Saga 패턴은 각 단계에 대응하는 **보상 액션(Compensating Action)**을 정의하여 이를 처리합니다.

Saga 구현

saga_pattern.py
python
from dataclasses import dataclass
from typing import Callable, Any
 
@dataclass
class SagaStep:
    name: str
    action: Callable
    compensation: Callable  # 보상 액션
 
class SagaOrchestrator:
    def __init__(self, steps: list[SagaStep]):
        self.steps = steps
        self.completed_steps: list[tuple[SagaStep, Any]] = []
 
    async def execute(self, context: dict) -> dict:
        """Saga 실행: 실패 시 보상 트랜잭션 수행"""
        try:
            for step in self.steps:
                logger.info(f"Saga 단계 실행: {step.name}")
                result = await step.action(context)
                self.completed_steps.append((step, result))
                context[f"{step.name}_result"] = result
 
            return context
 
        except Exception as e:
            logger.error(f"Saga 실패 at {step.name}: {e}")
            await self._compensate(context)
            raise SagaFailure(
                failed_step=step.name,
                error=e,
                compensated_steps=[s.name for s, _ in self.completed_steps],
            )
 
    async def _compensate(self, context: dict) -> None:
        """역순으로 보상 액션 실행"""
        for step, result in reversed(self.completed_steps):
            try:
                logger.info(f"보상 실행: {step.name}")
                await step.compensation(context, result)
            except Exception as comp_error:
                # 보상 실패는 별도 기록 후 수동 처리 큐에 추가
                logger.critical(
                    f"보상 실패 at {step.name}: {comp_error}"
                )
                await dead_letter_queue.enqueue(
                    FailedCompensation(step=step.name, error=comp_error)
                )
 
# 주문 처리 Saga 예시
order_saga = SagaOrchestrator([
    SagaStep(
        name="create_order",
        action=create_order,
        compensation=cancel_order,
    ),
    SagaStep(
        name="process_payment",
        action=process_payment,
        compensation=refund_payment,
    ),
    SagaStep(
        name="deduct_inventory",
        action=deduct_inventory,
        compensation=restore_inventory,
    ),
    SagaStep(
        name="request_shipping",
        action=request_shipping,
        compensation=cancel_shipping,
    ),
])
Warning

보상 액션 자체가 실패할 수 있다는 점을 반드시 고려해야 합니다. 보상 실패는 데드레터 큐에 기록하고 수동으로 처리해야 합니다. 보상 액션도 멱등성을 보장해야 중복 실행 시 문제가 발생하지 않습니다.

부분 실패 처리

병렬로 실행되는 태스크 중 일부만 실패하는 경우, 전체를 실패로 처리할지 부분적으로 결과를 활용할지 결정해야 합니다.

partial_failure.py
python
from enum import Enum
 
class PartialFailureStrategy(Enum):
    FAIL_FAST = "fail_fast"           # 하나라도 실패하면 전체 중단
    BEST_EFFORT = "best_effort"       # 성공한 결과만 사용
    QUORUM = "quorum"                 # N개 이상 성공하면 진행
 
@dataclass
class ParallelExecutor:
    strategy: PartialFailureStrategy
    quorum_count: int = None  # QUORUM 전략일 때 필요한 최소 성공 수
 
    async def execute(self, tasks: list[Task]) -> ParallelResult:
        results = await asyncio.gather(
            *[self._execute_task(t) for t in tasks],
            return_exceptions=True,
        )
 
        successes = [(t, r) for t, r in zip(tasks, results) if not isinstance(r, Exception)]
        failures = [(t, r) for t, r in zip(tasks, results) if isinstance(r, Exception)]
 
        match self.strategy:
            case PartialFailureStrategy.FAIL_FAST:
                if failures:
                    raise PartialFailure(successes=successes, failures=failures)
                return ParallelResult(results=successes)
 
            case PartialFailureStrategy.BEST_EFFORT:
                if not successes:
                    raise AllTasksFailed(failures=failures)
                return ParallelResult(results=successes, partial=bool(failures))
 
            case PartialFailureStrategy.QUORUM:
                if len(successes) >= self.quorum_count:
                    return ParallelResult(results=successes, partial=bool(failures))
                raise QuorumNotMet(
                    required=self.quorum_count,
                    achieved=len(successes),
                )

데드레터 큐

모든 복구 전략이 실패한 메시지는 **데드레터 큐(Dead Letter Queue, DLQ)**로 이동합니다. DLQ는 처리 불가능한 작업을 안전하게 보관하면서 수동 조사와 처리를 가능하게 합니다.

dead_letter_queue.py
python
@dataclass
class DeadLetterEntry:
    id: str
    original_task: dict
    error_history: list[dict]  # 모든 시도와 에러 기록
    workflow_id: str
    created_at: datetime
    retry_count: int
    status: str  # pending, investigating, resolved, discarded
 
class DeadLetterQueue:
    async def enqueue(self, entry: DeadLetterEntry) -> None:
        """DLQ에 항목 추가 및 알림"""
        await self.store.save(entry)
        await self.alerter.send(
            channel="ops",
            severity="high",
            message=f"DLQ 항목 추가: {entry.original_task.get('name', 'unknown')}",
            context={"workflow_id": entry.workflow_id, "error": entry.error_history[-1]},
        )
 
    async def retry(self, entry_id: str) -> None:
        """DLQ 항목 수동 재시도"""
        entry = await self.store.get(entry_id)
        entry.status = "retrying"
        await self.store.update(entry)
        await self.task_runner.submit(entry.original_task)

복원력 계층 통합

지금까지 다룬 패턴들을 하나의 복원력 계층으로 통합합니다.


정리

이 장에서는 Agentic Workflow의 에러 복구와 재시도 전략을 살펴보았습니다.

  • 에러를 일시적/영구적/논리적으로 분류하여 적절한 대응 전략을 적용합니다
  • 지수 백오프 + 지터로 재시도 간격을 조절하고, 멱등성을 보장합니다
  • 서킷 브레이커로 반복 실패하는 외부 서비스에 대한 부하를 차단합니다
  • 모델 폴백 체인으로 LLM 호출의 가용성을 높입니다
  • Saga 패턴으로 다단계 워크플로우의 일관성을 보장합니다
  • 데드레터 큐로 최종 실패한 작업을 안전하게 보관합니다

다음 장 예고

6장에서는 워크플로우의 상태 관리와 체크포인팅을 심층적으로 다룹니다. 이벤트 소싱, 체크포인트 저장소 선택, 멱등성 보장, 상태 버전 마이그레이션, 분산 상태 일관성 등 상태 관리의 모든 측면을 살펴보겠습니다.

이 글이 도움이 되셨나요?

관련 주제 더 보기

#workflow#ai#automation

관련 글

AI / ML

6장: 상태 관리와 체크포인팅

Agentic Workflow의 상태 모델, 이벤트 소싱, 체크포인트 저장소 선택, 멱등성 보장, 상태 복원과 버전 마이그레이션, 분산 상태 일관성 전략을 다룹니다.

2026년 3월 11일·16분
AI / ML

4장: 장기 실행 워크플로우 관리

시간/일 단위 워크플로우의 듀러블 실행, 체크포인팅, 일시 정지와 재개, 상태 직렬화, 타임아웃 관리, 분산 실행 전략을 정리합니다.

2026년 3월 7일·16분
AI / ML

7장: 감사 로깅과 컴플라이언스

에이전트 행동 추적, 불변 감사 로그 설계, 규제 요구사항 대응, 설명 가능성, 재현 가능성, OpenTelemetry 통합, 보존 정책을 다룹니다.

2026년 3월 13일·16분
이전 글4장: 장기 실행 워크플로우 관리
다음 글6장: 상태 관리와 체크포인팅

댓글

목차

약 17분 남음
  • 이 장에서 배울 내용
  • 에러 분류 체계
    • 3가지 에러 유형
  • 지수 백오프와 지터
    • 구현
    • 재시도 시 주의사항
  • 서킷 브레이커
    • 3가지 상태
  • 모델 폴백
  • 보상 트랜잭션 — Saga 패턴
    • Saga 구현
  • 부분 실패 처리
  • 데드레터 큐
  • 복원력 계층 통합
  • 정리
  • 다음 장 예고