본문으로 건너뛰기
Kreath Archive
TechProjectsBooksAbout
TechProjectsBooksAbout

내비게이션

  • Tech
  • Projects
  • Books
  • About
  • Tags

카테고리

  • AI / ML
  • 웹 개발
  • 프로그래밍
  • 개발 도구

연결

  • GitHub
  • Email
  • RSS
© 2026 Kreath Archive. All rights reserved.Built with Next.js + MDX
홈TechProjectsBooksAbout
//
  1. 홈
  2. 테크
  3. 6장: LLM 기반 데이터 추출 자동화
2026년 3월 30일·AI / ML·

6장: LLM 기반 데이터 추출 자동화

대량 문서 처리 파이프라인을 구축하고, 배치 처리, 비동기 추출, 품질 검증 루프, 비용 최적화 전략을 학습합니다.

16분1,105자8개 섹션
structured-outputaidata-engineeringllm
공유
structured-output6 / 10
12345678910
이전5장: 비정형 데이터에서 구조화된 정보 추출다음7장: ETL 파이프라인에 LLM 통합

학습 목표

  • 대량 문서를 효율적으로 처리하는 배치 파이프라인을 설계합니다
  • 비동기 처리와 동시성 제어를 통해 처리 속도를 최적화합니다
  • 품질 검증 루프로 추출 결과의 신뢰성을 높입니다
  • 셀렉터-LLM 하이브리드 접근과 비용 최적화 전략을 적용합니다

대량 문서 처리의 과제

5장에서 단일 문서의 데이터 추출 방법을 다루었습니다. 그러나 실무에서는 수백, 수천 건의 문서를 처리해야 하는 상황이 일반적입니다. 이때 다음과 같은 과제가 발생합니다.

  • 처리 속도: 순차 처리로는 수천 건의 문서를 합리적인 시간 내에 처리할 수 없습니다
  • API Rate Limit: LLM API에는 분당 요청 수, 분당 토큰 수 제한이 있습니다
  • 비용 관리: 대량 처리 시 API 호출 비용이 급격히 증가합니다
  • 오류 처리: 일부 문서의 추출 실패가 전체 파이프라인을 중단시키면 안 됩니다
  • 품질 보장: 수동 검토 없이 추출 결과의 정확성을 검증해야 합니다

배치 처리 파이프라인

기본 구조

대량 문서 처리 파이프라인의 기본 구조는 다음과 같습니다.

비동기 배치 프로세서 구현

batch_processor.py
python
import asyncio
import time
from dataclasses import dataclass, field
from typing import TypeVar, Generic
from pydantic import BaseModel
 
T = TypeVar("T", bound=BaseModel)
 
 
@dataclass
class ProcessingResult(Generic[T]):
    """처리 결과를 담는 컨테이너"""
    doc_id: str
    success: bool
    data: T | None = None
    error: str | None = None
    attempts: int = 0
    processing_time: float = 0.0
 
 
@dataclass
class BatchConfig:
    """배치 처리 설정"""
    max_concurrent: int = 10
    max_retries: int = 3
    retry_delay: float = 1.0
    rate_limit_rpm: int = 500
    batch_size: int = 50
 
 
class BatchProcessor(Generic[T]):
    """비동기 배치 문서 프로세서"""
 
    def __init__(
        self,
        config: BatchConfig,
        extract_fn,
        model_class: type[T]
    ):
        self.config = config
        self.extract_fn = extract_fn
        self.model_class = model_class
        self._semaphore = asyncio.Semaphore(config.max_concurrent)
        self._rate_limiter = RateLimiter(config.rate_limit_rpm)
 
    async def process_single(
        self, doc_id: str, content: str
    ) -> ProcessingResult[T]:
        """단일 문서를 처리합니다."""
        start_time = time.time()
 
        for attempt in range(1, self.config.max_retries + 1):
            try:
                async with self._semaphore:
                    await self._rate_limiter.acquire()
                    result = await self.extract_fn(content)
 
                return ProcessingResult(
                    doc_id=doc_id,
                    success=True,
                    data=result,
                    attempts=attempt,
                    processing_time=time.time() - start_time
                )
 
            except Exception as e:
                if attempt < self.config.max_retries:
                    delay = self.config.retry_delay * (2 ** (attempt - 1))
                    await asyncio.sleep(delay)
                else:
                    return ProcessingResult(
                        doc_id=doc_id,
                        success=False,
                        error=str(e),
                        attempts=attempt,
                        processing_time=time.time() - start_time
                    )
 
        # 이 코드에 도달하지 않지만 타입 안전성을 위해 포함
        return ProcessingResult(
            doc_id=doc_id, success=False, error="Unknown error", attempts=0
        )
 
    async def process_batch(
        self, documents: dict[str, str]
    ) -> list[ProcessingResult[T]]:
        """문서 배치를 비동기로 처리합니다."""
        tasks = [
            self.process_single(doc_id, content)
            for doc_id, content in documents.items()
        ]
        return await asyncio.gather(*tasks)
 
 
class RateLimiter:
    """토큰 버킷 기반 Rate Limiter"""
 
    def __init__(self, rpm: int):
        self.rpm = rpm
        self.interval = 60.0 / rpm
        self._last_call = 0.0
        self._lock = asyncio.Lock()
 
    async def acquire(self):
        async with self._lock:
            now = time.time()
            elapsed = now - self._last_call
            if elapsed < self.interval:
                await asyncio.sleep(self.interval - elapsed)
            self._last_call = time.time()

사용 예시

batch_usage.py
python
import asyncio
import instructor
from openai import AsyncOpenAI
from pydantic import BaseModel, Field
 
 
class InvoiceData(BaseModel):
    invoice_number: str
    vendor_name: str
    total_amount: float
    date: str
 
 
async def extract_invoice(content: str) -> InvoiceData:
    client = instructor.from_openai(AsyncOpenAI())
    return await client.chat.completions.create(
        model="gpt-4o-mini",
        response_model=InvoiceData,
        messages=[
            {"role": "system", "content": "송장에서 데이터를 추출하세요."},
            {"role": "user", "content": content}
        ]
    )
 
 
async def main():
    config = BatchConfig(
        max_concurrent=5,
        max_retries=3,
        rate_limit_rpm=100,
    )
 
    processor = BatchProcessor(
        config=config,
        extract_fn=extract_invoice,
        model_class=InvoiceData,
    )
 
    # 문서 목록 (실제로는 파일에서 로드)
    documents = {
        "INV-001": "송장 텍스트 1...",
        "INV-002": "송장 텍스트 2...",
        "INV-003": "송장 텍스트 3...",
    }
 
    results = await processor.process_batch(documents)
 
    success_count = sum(1 for r in results if r.success)
    fail_count = sum(1 for r in results if not r.success)
    print(f"성공: {success_count}, 실패: {fail_count}")
 
    for r in results:
        if r.success:
            print(f"  [{r.doc_id}] {r.data.vendor_name}: {r.data.total_amount}")
        else:
            print(f"  [{r.doc_id}] 오류: {r.error}")
 
 
asyncio.run(main())

품질 검증 루프

LLM 추출 결과는 스키마는 통과하더라도 내용이 정확하지 않을 수 있습니다. 품질 검증 루프를 통해 이를 감지하고 수정합니다.

다중 레이어 검증

quality_validation.py
python
from pydantic import BaseModel, Field, model_validator
from typing import Self
 
 
class ValidatedInvoice(BaseModel):
    """검증 로직이 포함된 송장 데이터"""
    invoice_number: str
    vendor_name: str
    items: list["InvoiceLineItem"]
    subtotal: float = Field(ge=0)
    tax_amount: float = Field(ge=0)
    total_amount: float = Field(ge=0)
 
    @model_validator(mode="after")
    def validate_totals(self) -> Self:
        """합계 금액의 정합성을 검증합니다."""
        calculated_subtotal = sum(item.amount for item in self.items)
        # 소수점 오차 허용 (1원 이내)
        if abs(calculated_subtotal - self.subtotal) > 1.0:
            raise ValueError(
                f"품목 합계({calculated_subtotal})와 "
                f"소계({self.subtotal})가 일치하지 않습니다."
            )
 
        expected_total = self.subtotal + self.tax_amount
        if abs(expected_total - self.total_amount) > 1.0:
            raise ValueError(
                f"소계+세액({expected_total})과 "
                f"총액({self.total_amount})이 일치하지 않습니다."
            )
        return self
 
 
class InvoiceLineItem(BaseModel):
    description: str
    quantity: int = Field(ge=1)
    unit_price: float = Field(ge=0)
    amount: float = Field(ge=0)
 
    @model_validator(mode="after")
    def validate_amount(self) -> Self:
        """수량 x 단가 = 금액 검증"""
        expected = self.quantity * self.unit_price
        if abs(expected - self.amount) > 1.0:
            raise ValueError(
                f"수량({self.quantity}) x 단가({self.unit_price}) = "
                f"{expected}이지만, 금액은 {self.amount}입니다."
            )
        return self

LLM 기반 의미적 검증

스키마 검증만으로는 잡을 수 없는 의미적 오류를 LLM으로 검증합니다.

semantic_validation.py
python
import instructor
from openai import OpenAI
from pydantic import BaseModel, Field
 
 
class ValidationResult(BaseModel):
    """검증 결과"""
    is_valid: bool = Field(description="검증 통과 여부")
    issues: list[str] = Field(
        default_factory=list,
        description="발견된 문제점 목록"
    )
    confidence: float = Field(
        ge=0.0, le=1.0,
        description="검증 결과 신뢰도"
    )
 
 
def validate_extraction(
    original_text: str,
    extracted_data: dict,
    data_description: str
) -> ValidationResult:
    """추출 결과를 원본 텍스트와 대조하여 검증합니다."""
    client = instructor.from_openai(OpenAI())
 
    return client.chat.completions.create(
        model="gpt-4o-mini",
        response_model=ValidationResult,
        messages=[
            {
                "role": "system",
                "content": (
                    "원본 텍스트와 추출된 데이터를 비교하여 "
                    "추출이 정확한지 검증하세요. "
                    "누락된 정보, 잘못된 값, 불일치를 확인하세요."
                )
            },
            {
                "role": "user",
                "content": (
                    f"데이터 유형: {data_description}\n\n"
                    f"원본 텍스트:\n{original_text}\n\n"
                    f"추출된 데이터:\n{extracted_data}"
                )
            }
        ]
    )
Tip

의미적 검증에는 추출에 사용한 모델보다 저렴한 모델(예: gpt-4o-mini)을 사용하면 비용을 절약할 수 있습니다. 검증은 추출보다 단순한 작업이므로, 작은 모델로도 충분한 성능을 발휘합니다.


셀렉터-LLM 하이브리드 접근

모든 추출을 LLM에 의존하면 비용이 높아집니다. 규칙 기반 추출을 먼저 시도하고, 실패하거나 신뢰도가 낮을 때만 LLM을 사용하는 하이브리드 접근이 효율적입니다.

hybrid_extraction.py
python
import re
from pydantic import BaseModel, Field
 
 
class ExtractedDate(BaseModel):
    value: str = Field(description="YYYY-MM-DD 형식의 날짜")
    method: str = Field(description="추출 방식 (rule/llm)")
    confidence: float
 
 
def extract_date_hybrid(text: str) -> ExtractedDate:
    """규칙 기반 추출을 먼저 시도하고, 실패 시 LLM을 사용합니다."""
 
    # 1단계: 규칙 기반 추출 시도
    date_patterns = [
        r"(\d{4})[-./ ](\d{1,2})[-./ ](\d{1,2})",  # 2024-01-15
        r"(\d{4})년\s*(\d{1,2})월\s*(\d{1,2})일",   # 2024년 1월 15일
    ]
 
    for pattern in date_patterns:
        match = re.search(pattern, text)
        if match:
            year, month, day = match.groups()
            date_str = f"{year}-{int(month):02d}-{int(day):02d}"
            return ExtractedDate(
                value=date_str,
                method="rule",
                confidence=0.95
            )
 
    # 2단계: LLM 폴백
    import instructor
    from openai import OpenAI
 
    client = instructor.from_openai(OpenAI())
    result = client.chat.completions.create(
        model="gpt-4o-mini",
        response_model=ExtractedDate,
        messages=[
            {
                "role": "system",
                "content": "텍스트에서 날짜를 찾아 YYYY-MM-DD 형식으로 추출하세요."
            },
            {"role": "user", "content": text}
        ]
    )
    result.method = "llm"
    return result

하이브리드 전략의 비용 효과

추출 방식성공률건당 비용처리 시간
규칙 기반만60-70%거의 0수 ms
LLM만95-99%높음수 초
하이브리드97-99%중간가변

하이브리드 접근에서는 전체 문서의 60-70%를 규칙으로 처리하고, 나머지 30-40%만 LLM을 호출하므로 총 비용이 크게 절감됩니다.


비용 최적화 전략

1. 모델 계층화

작업 복잡도에 따라 다른 모델을 사용합니다.

model_tiering.py
python
from enum import Enum
 
 
class ModelTier(str, Enum):
    FAST = "gpt-4o-mini"      # 단순 추출, 검증
    STANDARD = "gpt-4o"        # 일반 추출
    PREMIUM = "gpt-4o-2026-02" # 복잡한 추출, 멀티모달
 
 
def select_model(document_type: str, complexity: str) -> ModelTier:
    """문서 유형과 복잡도에 따라 모델을 선택합니다."""
    if complexity == "simple":
        return ModelTier.FAST
    elif document_type in ("scanned_pdf", "handwritten"):
        return ModelTier.PREMIUM
    else:
        return ModelTier.STANDARD

2. 결과 캐싱

동일하거나 유사한 문서에 대한 반복 호출을 방지합니다.

extraction_cache.py
python
import hashlib
import json
from pathlib import Path
from pydantic import BaseModel
 
 
class ExtractionCache:
    """추출 결과 캐시"""
 
    def __init__(self, cache_dir: str = ".cache/extractions"):
        self.cache_dir = Path(cache_dir)
        self.cache_dir.mkdir(parents=True, exist_ok=True)
 
    def _get_key(self, content: str, schema_name: str) -> str:
        """콘텐츠와 스키마 이름으로 캐시 키를 생성합니다."""
        hash_input = f"{schema_name}:{content}"
        return hashlib.sha256(hash_input.encode()).hexdigest()
 
    def get(self, content: str, schema_name: str) -> dict | None:
        """캐시에서 결과를 조회합니다."""
        key = self._get_key(content, schema_name)
        cache_file = self.cache_dir / f"{key}.json"
        if cache_file.exists():
            return json.loads(cache_file.read_text())
        return None
 
    def set(self, content: str, schema_name: str, result: BaseModel) -> None:
        """결과를 캐시에 저장합니다."""
        key = self._get_key(content, schema_name)
        cache_file = self.cache_dir / f"{key}.json"
        cache_file.write_text(result.model_dump_json(indent=2))

3. 프롬프트 최적화

불필요한 토큰을 줄여 비용을 절감합니다.

prompt_optimization.py
python
def optimize_prompt(text: str, max_tokens: int = 4000) -> str:
    """입력 텍스트를 최적화하여 토큰 사용을 줄입니다."""
    # 연속 공백/줄바꿈 정리
    import re
    text = re.sub(r"\s+", " ", text).strip()
 
    # 무의미한 반복 제거 (헤더, 푸터 등)
    lines = text.split(". ")
    seen = set()
    unique_lines = []
    for line in lines:
        normalized = line.strip().lower()
        if normalized not in seen:
            seen.add(normalized)
            unique_lines.append(line)
    text = ". ".join(unique_lines)
 
    # 길이 제한 (대략적인 토큰 추정)
    if len(text) > max_tokens * 3:
        text = text[: max_tokens * 3]
 
    return text
Warning

비용 최적화와 품질 사이에는 트레이드오프가 있습니다. 모델을 낮추거나 입력을 과도하게 자르면 추출 품질이 떨어질 수 있습니다. 최적의 균형점은 실험과 모니터링을 통해 찾아야 합니다.


정리

이번 장에서는 LLM 기반 데이터 추출을 대규모로 자동화하기 위한 핵심 기법을 학습했습니다.

핵심 내용을 정리하면 다음과 같습니다.

  • 비동기 배치 처리와 동시성 제어(Semaphore)로 대량 문서를 효율적으로 처리합니다
  • Rate Limiter를 적용하여 API 제한을 초과하지 않도록 관리합니다
  • Pydantic 검증과 LLM 의미적 검증을 결합한 다중 레이어 품질 검증을 구축합니다
  • 셀렉터-LLM 하이브리드 접근으로 비용 대비 효과를 극대화합니다
  • 모델 계층화, 결과 캐싱, 프롬프트 최적화로 총 비용을 절감합니다

다음 장 미리보기

7장에서는 ETL 파이프라인에 LLM을 통합하는 방법을 다룹니다. 전통적인 ETL과 LLM-enhanced ETL의 차이점, Transform 단계에서의 LLM 활용, Apache Airflow 및 Prefect와의 통합을 학습합니다.

이 글이 도움이 되셨나요?

관련 주제 더 보기

#structured-output#ai#data-engineering#llm

관련 글

AI / ML

7장: ETL 파이프라인에 LLM 통합

전통 ETL과 LLM-enhanced ETL을 비교하고, Transform 단계에 LLM을 적용하여 분류, 요약, 정규화, 감성분석을 수행하는 방법을 학습합니다.

2026년 4월 1일·14분
AI / ML

5장: 비정형 데이터에서 구조화된 정보 추출

PDF, 이미지, 웹페이지 등 비정형 데이터에서 LLM을 활용하여 구조화된 정보를 추출하는 실전 기법을 학습합니다.

2026년 3월 28일·18분
AI / ML

8장: 출력 검증과 폴백 전략

스키마 검증, 의미적 검증, 자동 재시도, 멀티 프로바이더 폴백, 부분 출력 복구 등 프로덕션 수준의 검증 전략을 학습합니다.

2026년 4월 3일·18분
이전 글5장: 비정형 데이터에서 구조화된 정보 추출
다음 글7장: ETL 파이프라인에 LLM 통합

댓글

목차

약 16분 남음
  • 학습 목표
  • 대량 문서 처리의 과제
  • 배치 처리 파이프라인
    • 기본 구조
    • 비동기 배치 프로세서 구현
    • 사용 예시
  • 품질 검증 루프
    • 다중 레이어 검증
    • LLM 기반 의미적 검증
  • 셀렉터-LLM 하이브리드 접근
    • 하이브리드 전략의 비용 효과
  • 비용 최적화 전략
    • 1. 모델 계층화
    • 2. 결과 캐싱
    • 3. 프롬프트 최적화
  • 정리
  • 다음 장 미리보기