PDF 송장에서 구조화된 JSON 데이터를 추출하는 엔드투엔드 파이프라인을 FastAPI, Pydantic, 검증 루프, 배치 처리로 구축합니다.
이번 장에서 구축하는 시스템은 다음과 같은 흐름으로 동작합니다.
| 구성 요소 | 기술 |
|---|---|
| API 서버 | FastAPI |
| 스키마 정의 | Pydantic v2 |
| LLM 호출 | Instructor + OpenAI |
| PDF 처리 | PyMuPDF (fitz) |
| 비동기 처리 | asyncio |
| 데이터 저장 | SQLite (프로토타입) |
invoice-extractor/
app/
__init__.py
main.py # FastAPI 앱
models.py # Pydantic 스키마
extraction.py # LLM 추출 로직
validation.py # 검증 로직
storage.py # 데이터 저장
pdf_processor.py # PDF 처리
config.py # 설정
tests/
test_models.py
test_extraction.py
test_validation.py
requirements.txt
README.md먼저 송장 데이터의 Pydantic 모델을 정의합니다. 이 모델은 LLM 출력 스키마, API 응답 스키마, 검증 로직을 모두 포함합니다.
from pydantic import BaseModel, Field, field_validator, model_validator
from typing import Literal, Self
from datetime import date
from enum import Enum
class Currency(str, Enum):
KRW = "KRW"
USD = "USD"
EUR = "EUR"
JPY = "JPY"
class LineItem(BaseModel):
"""송장 품목"""
description: str = Field(
min_length=1,
description="품목 설명"
)
quantity: int = Field(
ge=1,
description="수량"
)
unit_price: float = Field(
ge=0,
description="단가"
)
amount: float = Field(
ge=0,
description="금액 (수량 x 단가)"
)
@model_validator(mode="after")
def validate_line_amount(self) -> Self:
expected = self.quantity * self.unit_price
if abs(expected - self.amount) > 1.0:
raise ValueError(
f"품목 금액 불일치: {self.quantity} x "
f"{self.unit_price} = {expected}, "
f"실제: {self.amount}"
)
return self
class VendorInfo(BaseModel):
"""공급자 정보"""
name: str = Field(min_length=1, description="상호")
registration_number: str | None = Field(
default=None,
description="사업자등록번호"
)
address: str | None = Field(
default=None,
description="주소"
)
contact: str | None = Field(
default=None,
description="연락처"
)
class BuyerInfo(BaseModel):
"""구매자 정보"""
name: str = Field(min_length=1, description="상호")
registration_number: str | None = Field(
default=None,
description="사업자등록번호"
)
class InvoiceExtraction(BaseModel):
"""송장 추출 결과 (LLM 출력 스키마)"""
invoice_number: str = Field(
min_length=1,
description="송장 번호"
)
issue_date: str = Field(
description="발행일 (YYYY-MM-DD)"
)
due_date: str | None = Field(
default=None,
description="만기일 (YYYY-MM-DD)"
)
vendor: VendorInfo = Field(description="공급자 정보")
buyer: BuyerInfo = Field(description="구매자 정보")
items: list[LineItem] = Field(
min_length=1,
description="품목 목록 (최소 1개)"
)
subtotal: float = Field(ge=0, description="소계")
tax_amount: float = Field(ge=0, description="세액")
total_amount: float = Field(ge=0, description="합계")
currency: Currency = Field(
default=Currency.KRW,
description="통화"
)
notes: str | None = Field(
default=None,
description="비고"
)
@field_validator("issue_date", "due_date")
@classmethod
def validate_date(cls, v: str | None) -> str | None:
if v is None:
return v
try:
date.fromisoformat(v)
except ValueError:
raise ValueError(
f"올바르지 않은 날짜 형식: {v} (YYYY-MM-DD 필요)"
)
return v
@model_validator(mode="after")
def validate_totals(self) -> Self:
items_total = sum(item.amount for item in self.items)
if abs(items_total - self.subtotal) > 10.0:
raise ValueError(
f"품목 합계({items_total})와 "
f"소계({self.subtotal})가 불일치합니다."
)
expected_total = self.subtotal + self.tax_amount
if abs(expected_total - self.total_amount) > 10.0:
raise ValueError(
f"소계+세액({expected_total})과 "
f"총액({self.total_amount})이 불일치합니다."
)
return self
class ExtractionResponse(BaseModel):
"""API 응답 모델"""
success: bool
data: InvoiceExtraction | None = None
validation_score: float = Field(
ge=0.0, le=1.0,
description="검증 점수"
)
warnings: list[str] = Field(default_factory=list)
processing_time_ms: int = Field(description="처리 시간 (밀리초)")import fitz # PyMuPDF
import base64
from pathlib import Path
from dataclasses import dataclass
@dataclass
class PDFContent:
"""PDF 처리 결과"""
text: str
page_count: int
has_images: bool
images_b64: list[str]
class PDFProcessor:
"""PDF 문서 처리기"""
def __init__(self, dpi: int = 200):
self.dpi = dpi
def process(self, pdf_path: str | Path) -> PDFContent:
"""PDF를 처리하여 텍스트와 이미지를 추출합니다."""
doc = fitz.open(str(pdf_path))
# 텍스트 추출
text_parts = []
for page in doc:
text_parts.append(page.get_text())
text = "\n".join(text_parts).strip()
# 텍스트가 충분하지 않으면 이미지로 처리 (스캔 PDF)
has_meaningful_text = len(text.replace(" ", "").replace("\n", "")) > 50
images_b64 = []
if not has_meaningful_text:
for page in doc:
pix = page.get_pixmap(dpi=self.dpi)
img_bytes = pix.tobytes("png")
images_b64.append(
base64.b64encode(img_bytes).decode("utf-8")
)
doc.close()
return PDFContent(
text=text,
page_count=len(doc) if hasattr(doc, '__len__') else 0,
has_images=len(images_b64) > 0,
images_b64=images_b64,
)
def process_bytes(self, pdf_bytes: bytes) -> PDFContent:
"""바이트 데이터에서 PDF를 처리합니다."""
doc = fitz.open(stream=pdf_bytes, filetype="pdf")
page_count = len(doc)
text_parts = []
for page in doc:
text_parts.append(page.get_text())
text = "\n".join(text_parts).strip()
has_meaningful_text = len(text.replace(" ", "").replace("\n", "")) > 50
images_b64 = []
if not has_meaningful_text:
for page in doc:
pix = page.get_pixmap(dpi=self.dpi)
img_bytes = pix.tobytes("png")
images_b64.append(
base64.b64encode(img_bytes).decode("utf-8")
)
doc.close()
return PDFContent(
text=text,
page_count=page_count,
has_images=len(images_b64) > 0,
images_b64=images_b64,
)import instructor
from openai import AsyncOpenAI
from app.models import InvoiceExtraction
from app.pdf_processor import PDFContent
SYSTEM_PROMPT = """당신은 송장(Invoice) 데이터 추출 전문가입니다.
다음 규칙을 반드시 준수하세요:
1. 모든 금액은 숫자만 포함합니다 (통화 기호, 쉼표 제외)
2. 날짜는 YYYY-MM-DD 형식으로 표준화합니다
3. 찾을 수 없는 정보는 null로 표시합니다
4. 품목의 금액은 반드시 수량 x 단가와 일치해야 합니다
5. 합계는 반드시 소계 + 세액과 일치해야 합니다
6. 사업자등록번호는 하이픈 없이 숫자만 포함합니다"""
class ExtractionEngine:
"""LLM 기반 송장 추출 엔진"""
def __init__(self, model: str = "gpt-4o-2026-02", max_retries: int = 3):
self.model = model
self.max_retries = max_retries
self.client = instructor.from_openai(AsyncOpenAI())
async def extract_from_text(self, text: str) -> InvoiceExtraction:
"""텍스트에서 송장 데이터를 추출합니다."""
return await self.client.chat.completions.create(
model=self.model,
response_model=InvoiceExtraction,
max_retries=self.max_retries,
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": f"다음 송장 텍스트에서 데이터를 추출하세요:\n\n{text}"}
]
)
async def extract_from_images(
self, images_b64: list[str]
) -> InvoiceExtraction:
"""이미지에서 송장 데이터를 추출합니다."""
content = [
{"type": "text", "text": "이 송장 이미지에서 모든 데이터를 추출하세요."}
]
for img in images_b64:
content.append({
"type": "image_url",
"image_url": {
"url": f"data:image/png;base64,{img}",
"detail": "high"
}
})
return await self.client.chat.completions.create(
model=self.model,
response_model=InvoiceExtraction,
max_retries=self.max_retries,
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": content}
]
)
async def extract(self, pdf_content: PDFContent) -> InvoiceExtraction:
"""PDF 콘텐츠에서 송장 데이터를 추출합니다."""
if pdf_content.has_images:
return await self.extract_from_images(pdf_content.images_b64)
else:
return await self.extract_from_text(pdf_content.text)import instructor
from openai import AsyncOpenAI
from pydantic import BaseModel, Field
from app.models import InvoiceExtraction
from datetime import date
from dataclasses import dataclass
@dataclass
class ValidationResult:
"""검증 결과"""
passed: bool
score: float
warnings: list[str]
errors: list[str]
class SemanticCheck(BaseModel):
"""LLM 의미적 검증 결과"""
is_faithful: bool
issues: list[str] = Field(default_factory=list)
accuracy_score: float = Field(ge=0.0, le=1.0)
class InvoiceValidator:
"""송장 데이터 검증기"""
def __init__(self):
self.client = instructor.from_openai(AsyncOpenAI())
async def validate(
self,
extraction: InvoiceExtraction,
original_text: str
) -> ValidationResult:
"""다중 레이어 검증을 수행합니다."""
warnings = []
errors = []
# 1. 비즈니스 로직 검증
biz_result = self._validate_business_rules(extraction)
warnings.extend(biz_result["warnings"])
errors.extend(biz_result["errors"])
# 2. 의미적 검증 (원본이 있는 경우)
semantic_score = 1.0
if original_text:
semantic = await self._validate_semantically(
extraction, original_text
)
semantic_score = semantic.accuracy_score
if not semantic.is_faithful:
warnings.extend(semantic.issues)
# 종합 점수 계산
score = semantic_score
if errors:
score *= 0.5
if warnings:
score *= 0.8
return ValidationResult(
passed=len(errors) == 0,
score=round(score, 2),
warnings=warnings,
errors=errors,
)
def _validate_business_rules(
self, extraction: InvoiceExtraction
) -> dict[str, list[str]]:
"""비즈니스 규칙을 검증합니다."""
warnings = []
errors = []
# 미래 날짜 검증
today = date.today().isoformat()
if extraction.issue_date > today:
errors.append(
f"발행일이 미래입니다: {extraction.issue_date}"
)
# 금액 범위 검증
if extraction.total_amount > 1_000_000_000:
warnings.append(
f"총액이 10억을 초과합니다: {extraction.total_amount}"
)
if extraction.total_amount == 0:
warnings.append("총액이 0원입니다.")
# 품목 수 검증
if len(extraction.items) > 100:
warnings.append(
f"품목이 100개를 초과합니다: {len(extraction.items)}개"
)
return {"warnings": warnings, "errors": errors}
async def _validate_semantically(
self,
extraction: InvoiceExtraction,
original_text: str
) -> SemanticCheck:
"""의미적 정확성을 검증합니다."""
return await self.client.chat.completions.create(
model="gpt-4o-mini",
response_model=SemanticCheck,
messages=[
{
"role": "system",
"content": (
"원본 텍스트와 추출 결과를 비교하여 "
"정확성을 평가하세요."
)
},
{
"role": "user",
"content": (
f"원본:\n{original_text[:3000]}\n\n"
f"추출 결과:\n{extraction.model_dump_json(indent=2)}"
)
}
]
)import time
from fastapi import FastAPI, UploadFile, File, HTTPException
from app.models import ExtractionResponse, InvoiceExtraction
from app.pdf_processor import PDFProcessor
from app.extraction import ExtractionEngine
from app.validation import InvoiceValidator
app = FastAPI(
title="Invoice Extraction API",
description="PDF 송장에서 구조화된 데이터를 추출하는 API",
version="1.0.0",
)
pdf_processor = PDFProcessor(dpi=200)
extraction_engine = ExtractionEngine(model="gpt-4o-2026-02", max_retries=3)
validator = InvoiceValidator()
@app.post("/extract", response_model=ExtractionResponse)
async def extract_invoice(file: UploadFile = File(...)):
"""PDF 송장에서 데이터를 추출합니다."""
start_time = time.time()
# 파일 검증
if not file.filename or not file.filename.lower().endswith(".pdf"):
raise HTTPException(
status_code=400,
detail="PDF 파일만 지원합니다."
)
# PDF 읽기
pdf_bytes = await file.read()
if len(pdf_bytes) > 10 * 1024 * 1024: # 10MB 제한
raise HTTPException(
status_code=400,
detail="파일 크기가 10MB를 초과합니다."
)
try:
# 1. PDF 처리
pdf_content = pdf_processor.process_bytes(pdf_bytes)
# 2. LLM 추출
extraction = await extraction_engine.extract(pdf_content)
# 3. 검증
validation = await validator.validate(
extraction=extraction,
original_text=pdf_content.text,
)
processing_time = int((time.time() - start_time) * 1000)
return ExtractionResponse(
success=validation.passed,
data=extraction,
validation_score=validation.score,
warnings=validation.warnings,
processing_time_ms=processing_time,
)
except Exception as e:
processing_time = int((time.time() - start_time) * 1000)
return ExtractionResponse(
success=False,
data=None,
validation_score=0.0,
warnings=[str(e)],
processing_time_ms=processing_time,
)
@app.post("/extract/batch")
async def extract_batch(files: list[UploadFile] = File(...)):
"""여러 PDF 송장을 배치로 처리합니다."""
if len(files) > 50:
raise HTTPException(
status_code=400,
detail="한 번에 최대 50개 파일까지 처리할 수 있습니다."
)
results = []
for file in files:
result = await extract_invoice(file)
results.append({
"filename": file.filename,
"result": result.model_dump(),
})
success_count = sum(1 for r in results if r["result"]["success"])
return {
"total": len(files),
"success": success_count,
"failed": len(files) - success_count,
"results": results,
}
@app.get("/health")
async def health_check():
"""헬스 체크 엔드포인트"""
return {"status": "healthy", "version": "1.0.0"}# 의존성 설치
pip install fastapi uvicorn instructor openai pymupdf pydantic
# 서버 실행
uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload# 단일 파일 추출
curl -X POST http://localhost:8000/extract \
-F "file=@invoice.pdf" \
| python -m json.tool
# 배치 추출
curl -X POST http://localhost:8000/extract/batch \
-F "files=@invoice1.pdf" \
-F "files=@invoice2.pdf" \
-F "files=@invoice3.pdf" \
| python -m json.tool{
"success": true,
"data": {
"invoice_number": "INV-2026-0042",
"issue_date": "2026-03-15",
"due_date": "2026-04-15",
"vendor": {
"name": "크리에이티브 솔루션즈",
"registration_number": "1234567890",
"address": "서울특별시 강남구 테헤란로 123",
"contact": "02-1234-5678"
},
"buyer": {
"name": "테크 스타트업",
"registration_number": "9876543210"
},
"items": [
{
"description": "웹 개발 서비스 (3월)",
"quantity": 1,
"unit_price": 5000000,
"amount": 5000000
},
{
"description": "서버 호스팅 (월간)",
"quantity": 1,
"unit_price": 300000,
"amount": 300000
}
],
"subtotal": 5300000,
"tax_amount": 530000,
"total_amount": 5830000,
"currency": "KRW",
"notes": null
},
"validation_score": 0.95,
"warnings": [],
"processing_time_ms": 3420
}이 프로젝트는 프로토타입입니다. 프로덕션 배포 시에는 인증(OAuth2), 파일 저장소(S3), 데이터베이스(PostgreSQL), 비동기 큐(Celery/Redis), 모니터링(Prometheus/Grafana) 등을 추가해야 합니다. 9장에서 다룬 비용 추적, 서킷 브레이커, 메트릭 수집도 함께 통합하세요.
10개의 장에 걸쳐 Structured Output과 AI 데이터 파이프라인의 전체 여정을 학습했습니다. 각 장의 핵심 내용을 최종 정리합니다.
| 장 | 핵심 내용 |
|---|---|
| 1장 | LLM 비정형 출력의 문제와 3가지 해결 접근, 제약 디코딩 원리 |
| 2장 | JSON Schema 기초, OpenAI/Anthropic/Google API, 스키마 설계 패턴 |
| 3장 | Function Calling 원리, 프로바이더별 구현, 에이전트 루프 |
| 4장 | Pydantic v2 모델, Instructor 라이브러리, Zod (TypeScript) |
| 5장 | PDF/이미지/웹 비정형 데이터 추출, OCR+LLM, 엔티티-관계 추출 |
| 6장 | 배치 처리, 비동기 추출, 품질 검증 루프, 비용 최적화 |
| 7장 | ETL 파이프라인 LLM 통합, Transform 활용, Airflow/Prefect |
| 8장 | 다중 레이어 검증, 자동 재시도, 멀티 프로바이더 폴백 |
| 9장 | 서킷 브레이커, 관측 가능성, 비용 추적, 스키마 버전 관리, CI/CD |
| 10장 | 엔드투엔드 파이프라인 구축 (PDF 송장 추출 시스템) |
Structured Output은 LLM을 소프트웨어 시스템에 안정적으로 통합하기 위한 핵심 기술입니다. 이 시리즈에서 다룬 패턴과 기법을 실무에 적용하여, 신뢰할 수 있는 AI 데이터 파이프라인을 구축하시기 바랍니다.
이 글이 도움이 되셨나요?
재시도, 서킷 브레이커, 관측 가능성, 비용 추적, 스키마 버전 관리 등 프로덕션 수준의 AI 파이프라인 운영 기법을 학습합니다.
스키마 검증, 의미적 검증, 자동 재시도, 멀티 프로바이더 폴백, 부분 출력 복구 등 프로덕션 수준의 검증 전략을 학습합니다.
전통 ETL과 LLM-enhanced ETL을 비교하고, Transform 단계에 LLM을 적용하여 분류, 요약, 정규화, 감성분석을 수행하는 방법을 학습합니다.