전통 ETL과 LLM-enhanced ETL을 비교하고, Transform 단계에 LLM을 적용하여 분류, 요약, 정규화, 감성분석을 수행하는 방법을 학습합니다.
**ETL(Extract, Transform, Load)**은 데이터를 추출하고, 변환하고, 적재하는 데이터 처리 패턴입니다. 전통적인 ETL의 Transform 단계는 규칙 기반 로직에 의존합니다. 정해진 매핑 테이블, 정규표현식, 조건문으로 데이터를 변환합니다.
LLM-enhanced ETL은 이 Transform 단계에 LLM의 자연어 이해 능력을 투입합니다. 규칙으로 정의하기 어려운 변환 작업을 LLM이 수행합니다.
| 특성 | 전통 ETL | LLM-enhanced ETL |
|---|---|---|
| 변환 로직 | 규칙 기반 (코드) | 자연어 이해 + 규칙 |
| 처리 가능 데이터 | 정형 데이터 | 정형 + 비정형 |
| 유지보수 | 규칙 업데이트 필요 | 프롬프트 조정 |
| 비용 | 컴퓨팅 비용 (낮음) | API 비용 (상대적 높음) |
| 확장성 | 수평 확장 용이 | API Rate Limit 제약 |
| 적합한 작업 | 숫자 변환, 포맷팅 | 분류, 요약, 번역, 감성분석 |
LLM-enhanced ETL은 전통 ETL을 대체하는 것이 아닙니다. 규칙으로 충분한 변환은 여전히 규칙 기반이 효율적이며, LLM은 자연어 이해가 필요한 변환에만 선택적으로 적용합니다.
고객 문의를 카테고리로 자동 분류하는 것은 LLM Transform의 대표적인 사례입니다.
import instructor
from openai import AsyncOpenAI
from pydantic import BaseModel, Field
from typing import Literal
class TicketCategory(BaseModel):
"""티켓 분류 결과"""
category: Literal[
"billing", "shipping", "product_defect",
"return", "account", "general"
] = Field(description="문의 카테고리")
subcategory: str = Field(description="세부 카테고리")
priority: Literal["urgent", "high", "medium", "low"]
language: str = Field(description="고객 문의 언어 (ISO 639-1)")
async def classify_tickets(
tickets: list[dict[str, str]],
client: instructor.AsyncInstructor
) -> list[dict]:
"""티켓 배치를 분류합니다."""
results = []
for ticket in tickets:
classification = await client.chat.completions.create(
model="gpt-4o-mini",
response_model=TicketCategory,
messages=[
{
"role": "system",
"content": (
"고객 문의 티켓을 분류하세요. "
"우선순위는 고객의 감정 강도와 문제의 긴급성을 고려합니다."
)
},
{
"role": "user",
"content": ticket["content"]
}
]
)
results.append({
"ticket_id": ticket["id"],
**classification.model_dump()
})
return results긴 문서나 리뷰를 일정한 형식으로 요약합니다.
from pydantic import BaseModel, Field
class ArticleSummary(BaseModel):
"""기사 요약 결과"""
title: str = Field(description="기사 제목")
summary: str = Field(
max_length=300,
description="3문장 이내의 핵심 요약"
)
key_points: list[str] = Field(
min_length=1,
max_length=5,
description="핵심 포인트 (1-5개)"
)
topics: list[str] = Field(description="관련 주제 태그")
sentiment: Literal["positive", "negative", "neutral", "mixed"]
async def summarize_articles(
articles: list[dict],
client: instructor.AsyncInstructor
) -> list[ArticleSummary]:
"""기사 배치를 요약합니다."""
import asyncio
async def summarize_one(article: dict) -> ArticleSummary:
return await client.chat.completions.create(
model="gpt-4o-mini",
response_model=ArticleSummary,
messages=[
{
"role": "system",
"content": "기사를 구조화된 형식으로 요약하세요."
},
{"role": "user", "content": article["content"]}
]
)
return await asyncio.gather(
*[summarize_one(a) for a in articles]
)자유 형식 텍스트를 표준 형식으로 정규화합니다. 예를 들어, 다양하게 표현된 주소를 일관된 형태로 변환합니다.
from pydantic import BaseModel, Field
class NormalizedAddress(BaseModel):
"""정규화된 한국 주소"""
province: str = Field(description="시/도")
city: str = Field(description="시/군/구")
district: str = Field(description="읍/면/동")
detail: str = Field(description="상세 주소")
postal_code: str | None = Field(description="우편번호")
full_address: str = Field(description="정규화된 전체 주소")
async def normalize_addresses(
raw_addresses: list[str],
client: instructor.AsyncInstructor
) -> list[NormalizedAddress]:
"""비정형 주소 목록을 정규화합니다."""
import asyncio
async def normalize_one(addr: str) -> NormalizedAddress:
return await client.chat.completions.create(
model="gpt-4o-mini",
response_model=NormalizedAddress,
messages=[
{
"role": "system",
"content": (
"한국 주소를 정규화하세요. "
"약어를 풀고, 빠진 행정구역을 추가하세요. "
"우편번호를 모르면 null로 표시하세요."
)
},
{"role": "user", "content": addr}
]
)
return await asyncio.gather(
*[normalize_one(a) for a in raw_addresses]
)
# 예시: 다양한 형태의 주소 입력
raw = [
"서울 강남 테헤란로 123",
"경기 성남시 분당구 판교역로 235번길 5",
"부산광역시 해운대 우동 센텀시티",
]다국어 데이터를 처리하면서 동시에 감성을 분석하는 복합 변환도 가능합니다.
from pydantic import BaseModel, Field
from typing import Literal
class TranslatedReview(BaseModel):
"""번역 및 감성분석 결과"""
original_language: str = Field(description="원문 언어")
translated_text: str = Field(description="한국어 번역")
sentiment: Literal["positive", "negative", "neutral", "mixed"]
sentiment_score: float = Field(
ge=-1.0, le=1.0,
description="감성 점수 (-1: 매우 부정, 1: 매우 긍정)"
)
key_phrases: list[str] = Field(description="핵심 표현 (원문 기준)")LLM Transform에서 배치 크기는 비용, 속도, 품질의 균형을 결정합니다.
여러 항목을 하나의 LLM 호출로 처리할 수 있습니다. 이 경우 토큰 효율이 높아지지만, 개별 항목의 품질이 저하될 수 있습니다.
from pydantic import BaseModel, Field
# 접근 1: 개별 호출 (항목당 1회)
class SingleClassification(BaseModel):
category: str
confidence: float
# 접근 2: 배치 호출 (여러 항목을 한번에)
class BatchClassification(BaseModel):
"""여러 항목의 분류 결과"""
results: list["ClassificationItem"]
class ClassificationItem(BaseModel):
item_index: int = Field(description="입력 항목의 인덱스 (0부터)")
category: str
confidence: float| 배치 크기 | 장점 | 단점 |
|---|---|---|
| 1 (개별) | 최고 품질, 에러 격리 | 높은 비용, 느린 속도 |
| 5-10 | 좋은 품질, 비용 절감 | 약간의 품질 저하 가능 |
| 20-50 | 높은 비용 효율 | 품질 저하, 큰 응답 크기 |
| 50+ | 최대 비용 절감 | 품질 저하, 누락 위험 |
배치 크기의 최적값은 작업의 복잡도에 따라 달라집니다. 단순 분류는 10-20개를 한번에 처리해도 되지만, 요약이나 추출처럼 복잡한 작업은 1-5개가 적절합니다. 파일럿 테스트로 품질과 비용의 균형점을 찾으세요.
Apache Airflow에서 LLM Transform을 DAG로 구성하는 예시입니다.
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
default_args = {
"owner": "data-team",
"retries": 2,
"retry_delay": timedelta(minutes=5),
}
dag = DAG(
"llm_review_pipeline",
default_args=default_args,
description="리뷰 데이터 LLM 변환 파이프라인",
schedule="0 2 * * *", # 매일 오전 2시
start_date=datetime(2026, 1, 1),
catchup=False,
)
def extract_reviews(**context):
"""새로운 리뷰를 소스 DB에서 추출합니다."""
hook = PostgresHook(postgres_conn_id="source_db")
reviews = hook.get_records(
"SELECT id, content, created_at FROM reviews "
"WHERE processed = false LIMIT 500"
)
context["task_instance"].xcom_push(key="reviews", value=reviews)
def transform_with_llm(**context):
"""LLM으로 리뷰를 분류하고 요약합니다."""
import asyncio
reviews = context["task_instance"].xcom_pull(
task_ids="extract", key="reviews"
)
# 비동기 LLM 처리 (별도 모듈에서 구현)
from pipeline.llm_transform import process_reviews
results = asyncio.run(process_reviews(reviews))
context["task_instance"].xcom_push(key="results", value=results)
def load_results(**context):
"""변환 결과를 대상 DB에 적재합니다."""
results = context["task_instance"].xcom_pull(
task_ids="transform", key="results"
)
hook = PostgresHook(postgres_conn_id="target_db")
for result in results:
hook.run(
"INSERT INTO review_analysis "
"(review_id, category, sentiment, summary) "
"VALUES (%s, %s, %s, %s)",
parameters=(
result["id"],
result["category"],
result["sentiment"],
result["summary"],
)
)
extract_task = PythonOperator(
task_id="extract",
python_callable=extract_reviews,
dag=dag,
)
transform_task = PythonOperator(
task_id="transform",
python_callable=transform_with_llm,
dag=dag,
)
load_task = PythonOperator(
task_id="load",
python_callable=load_results,
dag=dag,
)
extract_task >> transform_task >> load_taskPrefect는 더 현대적인 Python 네이티브 워크플로우 도구입니다.
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
@task(
retries=2,
retry_delay_seconds=30,
cache_key_fn=task_input_hash,
cache_expiration=timedelta(hours=1)
)
async def extract_documents(source: str, limit: int = 100) -> list[dict]:
"""문서를 소스에서 추출합니다."""
# 데이터베이스에서 문서 로드
pass
@task(retries=3, retry_delay_seconds=10)
async def transform_document(doc: dict) -> dict:
"""단일 문서를 LLM으로 변환합니다."""
import instructor
from openai import AsyncOpenAI
client = instructor.from_openai(AsyncOpenAI())
# LLM 변환 로직
pass
@task
async def load_results(results: list[dict], target: str) -> int:
"""변환 결과를 적재합니다."""
# 결과 저장 로직
return len(results)
@flow(name="llm-etl-pipeline", log_prints=True)
async def llm_etl_pipeline(
source: str = "reviews_db",
target: str = "analytics_db",
batch_size: int = 100
):
"""LLM 기반 ETL 파이프라인"""
# Extract
documents = await extract_documents(source, limit=batch_size)
print(f"추출된 문서: {len(documents)}건")
# Transform (병렬 처리)
transform_tasks = [
transform_document.submit(doc)
for doc in documents
]
results = [t.result() for t in transform_tasks]
valid_results = [r for r in results if r is not None]
print(f"변환 성공: {len(valid_results)}건")
# Load
loaded_count = await load_results(valid_results, target)
print(f"적재 완료: {loaded_count}건")
return loaded_count이번 장에서는 ETL 파이프라인에 LLM을 통합하는 방법을 학습했습니다.
핵심 내용을 정리하면 다음과 같습니다.
8장에서는 출력 검증과 폴백 전략을 심층적으로 다룹니다. 스키마 검증, 의미적 검증, 자동 재시도, 멀티 프로바이더 폴백, 부분 출력 복구 등 프로덕션 수준의 검증 파이프라인을 설계합니다.
이 글이 도움이 되셨나요?