대량 문서 처리 파이프라인을 구축하고, 배치 처리, 비동기 추출, 품질 검증 루프, 비용 최적화 전략을 학습합니다.
5장에서 단일 문서의 데이터 추출 방법을 다루었습니다. 그러나 실무에서는 수백, 수천 건의 문서를 처리해야 하는 상황이 일반적입니다. 이때 다음과 같은 과제가 발생합니다.
대량 문서 처리 파이프라인의 기본 구조는 다음과 같습니다.
import asyncio
import time
from dataclasses import dataclass, field
from typing import TypeVar, Generic
from pydantic import BaseModel
T = TypeVar("T", bound=BaseModel)
@dataclass
class ProcessingResult(Generic[T]):
"""처리 결과를 담는 컨테이너"""
doc_id: str
success: bool
data: T | None = None
error: str | None = None
attempts: int = 0
processing_time: float = 0.0
@dataclass
class BatchConfig:
"""배치 처리 설정"""
max_concurrent: int = 10
max_retries: int = 3
retry_delay: float = 1.0
rate_limit_rpm: int = 500
batch_size: int = 50
class BatchProcessor(Generic[T]):
"""비동기 배치 문서 프로세서"""
def __init__(
self,
config: BatchConfig,
extract_fn,
model_class: type[T]
):
self.config = config
self.extract_fn = extract_fn
self.model_class = model_class
self._semaphore = asyncio.Semaphore(config.max_concurrent)
self._rate_limiter = RateLimiter(config.rate_limit_rpm)
async def process_single(
self, doc_id: str, content: str
) -> ProcessingResult[T]:
"""단일 문서를 처리합니다."""
start_time = time.time()
for attempt in range(1, self.config.max_retries + 1):
try:
async with self._semaphore:
await self._rate_limiter.acquire()
result = await self.extract_fn(content)
return ProcessingResult(
doc_id=doc_id,
success=True,
data=result,
attempts=attempt,
processing_time=time.time() - start_time
)
except Exception as e:
if attempt < self.config.max_retries:
delay = self.config.retry_delay * (2 ** (attempt - 1))
await asyncio.sleep(delay)
else:
return ProcessingResult(
doc_id=doc_id,
success=False,
error=str(e),
attempts=attempt,
processing_time=time.time() - start_time
)
# 이 코드에 도달하지 않지만 타입 안전성을 위해 포함
return ProcessingResult(
doc_id=doc_id, success=False, error="Unknown error", attempts=0
)
async def process_batch(
self, documents: dict[str, str]
) -> list[ProcessingResult[T]]:
"""문서 배치를 비동기로 처리합니다."""
tasks = [
self.process_single(doc_id, content)
for doc_id, content in documents.items()
]
return await asyncio.gather(*tasks)
class RateLimiter:
"""토큰 버킷 기반 Rate Limiter"""
def __init__(self, rpm: int):
self.rpm = rpm
self.interval = 60.0 / rpm
self._last_call = 0.0
self._lock = asyncio.Lock()
async def acquire(self):
async with self._lock:
now = time.time()
elapsed = now - self._last_call
if elapsed < self.interval:
await asyncio.sleep(self.interval - elapsed)
self._last_call = time.time()import asyncio
import instructor
from openai import AsyncOpenAI
from pydantic import BaseModel, Field
class InvoiceData(BaseModel):
invoice_number: str
vendor_name: str
total_amount: float
date: str
async def extract_invoice(content: str) -> InvoiceData:
client = instructor.from_openai(AsyncOpenAI())
return await client.chat.completions.create(
model="gpt-4o-mini",
response_model=InvoiceData,
messages=[
{"role": "system", "content": "송장에서 데이터를 추출하세요."},
{"role": "user", "content": content}
]
)
async def main():
config = BatchConfig(
max_concurrent=5,
max_retries=3,
rate_limit_rpm=100,
)
processor = BatchProcessor(
config=config,
extract_fn=extract_invoice,
model_class=InvoiceData,
)
# 문서 목록 (실제로는 파일에서 로드)
documents = {
"INV-001": "송장 텍스트 1...",
"INV-002": "송장 텍스트 2...",
"INV-003": "송장 텍스트 3...",
}
results = await processor.process_batch(documents)
success_count = sum(1 for r in results if r.success)
fail_count = sum(1 for r in results if not r.success)
print(f"성공: {success_count}, 실패: {fail_count}")
for r in results:
if r.success:
print(f" [{r.doc_id}] {r.data.vendor_name}: {r.data.total_amount}")
else:
print(f" [{r.doc_id}] 오류: {r.error}")
asyncio.run(main())LLM 추출 결과는 스키마는 통과하더라도 내용이 정확하지 않을 수 있습니다. 품질 검증 루프를 통해 이를 감지하고 수정합니다.
from pydantic import BaseModel, Field, model_validator
from typing import Self
class ValidatedInvoice(BaseModel):
"""검증 로직이 포함된 송장 데이터"""
invoice_number: str
vendor_name: str
items: list["InvoiceLineItem"]
subtotal: float = Field(ge=0)
tax_amount: float = Field(ge=0)
total_amount: float = Field(ge=0)
@model_validator(mode="after")
def validate_totals(self) -> Self:
"""합계 금액의 정합성을 검증합니다."""
calculated_subtotal = sum(item.amount for item in self.items)
# 소수점 오차 허용 (1원 이내)
if abs(calculated_subtotal - self.subtotal) > 1.0:
raise ValueError(
f"품목 합계({calculated_subtotal})와 "
f"소계({self.subtotal})가 일치하지 않습니다."
)
expected_total = self.subtotal + self.tax_amount
if abs(expected_total - self.total_amount) > 1.0:
raise ValueError(
f"소계+세액({expected_total})과 "
f"총액({self.total_amount})이 일치하지 않습니다."
)
return self
class InvoiceLineItem(BaseModel):
description: str
quantity: int = Field(ge=1)
unit_price: float = Field(ge=0)
amount: float = Field(ge=0)
@model_validator(mode="after")
def validate_amount(self) -> Self:
"""수량 x 단가 = 금액 검증"""
expected = self.quantity * self.unit_price
if abs(expected - self.amount) > 1.0:
raise ValueError(
f"수량({self.quantity}) x 단가({self.unit_price}) = "
f"{expected}이지만, 금액은 {self.amount}입니다."
)
return self스키마 검증만으로는 잡을 수 없는 의미적 오류를 LLM으로 검증합니다.
import instructor
from openai import OpenAI
from pydantic import BaseModel, Field
class ValidationResult(BaseModel):
"""검증 결과"""
is_valid: bool = Field(description="검증 통과 여부")
issues: list[str] = Field(
default_factory=list,
description="발견된 문제점 목록"
)
confidence: float = Field(
ge=0.0, le=1.0,
description="검증 결과 신뢰도"
)
def validate_extraction(
original_text: str,
extracted_data: dict,
data_description: str
) -> ValidationResult:
"""추출 결과를 원본 텍스트와 대조하여 검증합니다."""
client = instructor.from_openai(OpenAI())
return client.chat.completions.create(
model="gpt-4o-mini",
response_model=ValidationResult,
messages=[
{
"role": "system",
"content": (
"원본 텍스트와 추출된 데이터를 비교하여 "
"추출이 정확한지 검증하세요. "
"누락된 정보, 잘못된 값, 불일치를 확인하세요."
)
},
{
"role": "user",
"content": (
f"데이터 유형: {data_description}\n\n"
f"원본 텍스트:\n{original_text}\n\n"
f"추출된 데이터:\n{extracted_data}"
)
}
]
)의미적 검증에는 추출에 사용한 모델보다 저렴한 모델(예: gpt-4o-mini)을 사용하면 비용을 절약할 수 있습니다. 검증은 추출보다 단순한 작업이므로, 작은 모델로도 충분한 성능을 발휘합니다.
모든 추출을 LLM에 의존하면 비용이 높아집니다. 규칙 기반 추출을 먼저 시도하고, 실패하거나 신뢰도가 낮을 때만 LLM을 사용하는 하이브리드 접근이 효율적입니다.
import re
from pydantic import BaseModel, Field
class ExtractedDate(BaseModel):
value: str = Field(description="YYYY-MM-DD 형식의 날짜")
method: str = Field(description="추출 방식 (rule/llm)")
confidence: float
def extract_date_hybrid(text: str) -> ExtractedDate:
"""규칙 기반 추출을 먼저 시도하고, 실패 시 LLM을 사용합니다."""
# 1단계: 규칙 기반 추출 시도
date_patterns = [
r"(\d{4})[-./ ](\d{1,2})[-./ ](\d{1,2})", # 2024-01-15
r"(\d{4})년\s*(\d{1,2})월\s*(\d{1,2})일", # 2024년 1월 15일
]
for pattern in date_patterns:
match = re.search(pattern, text)
if match:
year, month, day = match.groups()
date_str = f"{year}-{int(month):02d}-{int(day):02d}"
return ExtractedDate(
value=date_str,
method="rule",
confidence=0.95
)
# 2단계: LLM 폴백
import instructor
from openai import OpenAI
client = instructor.from_openai(OpenAI())
result = client.chat.completions.create(
model="gpt-4o-mini",
response_model=ExtractedDate,
messages=[
{
"role": "system",
"content": "텍스트에서 날짜를 찾아 YYYY-MM-DD 형식으로 추출하세요."
},
{"role": "user", "content": text}
]
)
result.method = "llm"
return result| 추출 방식 | 성공률 | 건당 비용 | 처리 시간 |
|---|---|---|---|
| 규칙 기반만 | 60-70% | 거의 0 | 수 ms |
| LLM만 | 95-99% | 높음 | 수 초 |
| 하이브리드 | 97-99% | 중간 | 가변 |
하이브리드 접근에서는 전체 문서의 60-70%를 규칙으로 처리하고, 나머지 30-40%만 LLM을 호출하므로 총 비용이 크게 절감됩니다.
작업 복잡도에 따라 다른 모델을 사용합니다.
from enum import Enum
class ModelTier(str, Enum):
FAST = "gpt-4o-mini" # 단순 추출, 검증
STANDARD = "gpt-4o" # 일반 추출
PREMIUM = "gpt-4o-2026-02" # 복잡한 추출, 멀티모달
def select_model(document_type: str, complexity: str) -> ModelTier:
"""문서 유형과 복잡도에 따라 모델을 선택합니다."""
if complexity == "simple":
return ModelTier.FAST
elif document_type in ("scanned_pdf", "handwritten"):
return ModelTier.PREMIUM
else:
return ModelTier.STANDARD동일하거나 유사한 문서에 대한 반복 호출을 방지합니다.
import hashlib
import json
from pathlib import Path
from pydantic import BaseModel
class ExtractionCache:
"""추출 결과 캐시"""
def __init__(self, cache_dir: str = ".cache/extractions"):
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(parents=True, exist_ok=True)
def _get_key(self, content: str, schema_name: str) -> str:
"""콘텐츠와 스키마 이름으로 캐시 키를 생성합니다."""
hash_input = f"{schema_name}:{content}"
return hashlib.sha256(hash_input.encode()).hexdigest()
def get(self, content: str, schema_name: str) -> dict | None:
"""캐시에서 결과를 조회합니다."""
key = self._get_key(content, schema_name)
cache_file = self.cache_dir / f"{key}.json"
if cache_file.exists():
return json.loads(cache_file.read_text())
return None
def set(self, content: str, schema_name: str, result: BaseModel) -> None:
"""결과를 캐시에 저장합니다."""
key = self._get_key(content, schema_name)
cache_file = self.cache_dir / f"{key}.json"
cache_file.write_text(result.model_dump_json(indent=2))불필요한 토큰을 줄여 비용을 절감합니다.
def optimize_prompt(text: str, max_tokens: int = 4000) -> str:
"""입력 텍스트를 최적화하여 토큰 사용을 줄입니다."""
# 연속 공백/줄바꿈 정리
import re
text = re.sub(r"\s+", " ", text).strip()
# 무의미한 반복 제거 (헤더, 푸터 등)
lines = text.split(". ")
seen = set()
unique_lines = []
for line in lines:
normalized = line.strip().lower()
if normalized not in seen:
seen.add(normalized)
unique_lines.append(line)
text = ". ".join(unique_lines)
# 길이 제한 (대략적인 토큰 추정)
if len(text) > max_tokens * 3:
text = text[: max_tokens * 3]
return text비용 최적화와 품질 사이에는 트레이드오프가 있습니다. 모델을 낮추거나 입력을 과도하게 자르면 추출 품질이 떨어질 수 있습니다. 최적의 균형점은 실험과 모니터링을 통해 찾아야 합니다.
이번 장에서는 LLM 기반 데이터 추출을 대규모로 자동화하기 위한 핵심 기법을 학습했습니다.
핵심 내용을 정리하면 다음과 같습니다.
7장에서는 ETL 파이프라인에 LLM을 통합하는 방법을 다룹니다. 전통적인 ETL과 LLM-enhanced ETL의 차이점, Transform 단계에서의 LLM 활용, Apache Airflow 및 Prefect와의 통합을 학습합니다.
이 글이 도움이 되셨나요?
전통 ETL과 LLM-enhanced ETL을 비교하고, Transform 단계에 LLM을 적용하여 분류, 요약, 정규화, 감성분석을 수행하는 방법을 학습합니다.
PDF, 이미지, 웹페이지 등 비정형 데이터에서 LLM을 활용하여 구조화된 정보를 추출하는 실전 기법을 학습합니다.
스키마 검증, 의미적 검증, 자동 재시도, 멀티 프로바이더 폴백, 부분 출력 복구 등 프로덕션 수준의 검증 전략을 학습합니다.