본문으로 건너뛰기
Kreath Archive
TechProjectsBooksAbout
TechProjectsBooksAbout

내비게이션

  • Tech
  • Projects
  • Books
  • About
  • Tags

카테고리

  • AI / ML
  • 웹 개발
  • 프로그래밍
  • 개발 도구

연결

  • GitHub
  • Email
  • RSS
© 2026 Kreath Archive. All rights reserved.Built with Next.js + MDX
홈TechProjectsBooksAbout
//
  1. 홈
  2. 테크
  3. 7장: ETL 파이프라인에 LLM 통합
2026년 4월 1일·AI / ML·

7장: ETL 파이프라인에 LLM 통합

전통 ETL과 LLM-enhanced ETL을 비교하고, Transform 단계에 LLM을 적용하여 분류, 요약, 정규화, 감성분석을 수행하는 방법을 학습합니다.

14분860자7개 섹션
structured-outputaidata-engineeringllm
공유
structured-output7 / 10
12345678910
이전6장: LLM 기반 데이터 추출 자동화다음8장: 출력 검증과 폴백 전략

학습 목표

  • 전통 ETL과 LLM-enhanced ETL의 차이점을 이해합니다
  • Transform 단계에 LLM을 적용하여 분류, 요약, 정규화, 번역, 감성분석을 수행합니다
  • 배치 크기 최적화와 토큰 예산 관리를 학습합니다
  • Apache Airflow와 Prefect를 활용한 워크플로우 오케스트레이션을 구현합니다

전통 ETL vs LLM-enhanced ETL

**ETL(Extract, Transform, Load)**은 데이터를 추출하고, 변환하고, 적재하는 데이터 처리 패턴입니다. 전통적인 ETL의 Transform 단계는 규칙 기반 로직에 의존합니다. 정해진 매핑 테이블, 정규표현식, 조건문으로 데이터를 변환합니다.

LLM-enhanced ETL은 이 Transform 단계에 LLM의 자연어 이해 능력을 투입합니다. 규칙으로 정의하기 어려운 변환 작업을 LLM이 수행합니다.

비교

특성전통 ETLLLM-enhanced ETL
변환 로직규칙 기반 (코드)자연어 이해 + 규칙
처리 가능 데이터정형 데이터정형 + 비정형
유지보수규칙 업데이트 필요프롬프트 조정
비용컴퓨팅 비용 (낮음)API 비용 (상대적 높음)
확장성수평 확장 용이API Rate Limit 제약
적합한 작업숫자 변환, 포맷팅분류, 요약, 번역, 감성분석
Info

LLM-enhanced ETL은 전통 ETL을 대체하는 것이 아닙니다. 규칙으로 충분한 변환은 여전히 규칙 기반이 효율적이며, LLM은 자연어 이해가 필요한 변환에만 선택적으로 적용합니다.


Transform 단계에 LLM 적용

1. 텍스트 분류

고객 문의를 카테고리로 자동 분류하는 것은 LLM Transform의 대표적인 사례입니다.

llm_classification.py
python
import instructor
from openai import AsyncOpenAI
from pydantic import BaseModel, Field
from typing import Literal
 
 
class TicketCategory(BaseModel):
    """티켓 분류 결과"""
    category: Literal[
        "billing", "shipping", "product_defect",
        "return", "account", "general"
    ] = Field(description="문의 카테고리")
    subcategory: str = Field(description="세부 카테고리")
    priority: Literal["urgent", "high", "medium", "low"]
    language: str = Field(description="고객 문의 언어 (ISO 639-1)")
 
 
async def classify_tickets(
    tickets: list[dict[str, str]],
    client: instructor.AsyncInstructor
) -> list[dict]:
    """티켓 배치를 분류합니다."""
    results = []
    for ticket in tickets:
        classification = await client.chat.completions.create(
            model="gpt-4o-mini",
            response_model=TicketCategory,
            messages=[
                {
                    "role": "system",
                    "content": (
                        "고객 문의 티켓을 분류하세요. "
                        "우선순위는 고객의 감정 강도와 문제의 긴급성을 고려합니다."
                    )
                },
                {
                    "role": "user",
                    "content": ticket["content"]
                }
            ]
        )
        results.append({
            "ticket_id": ticket["id"],
            **classification.model_dump()
        })
    return results

2. 텍스트 요약

긴 문서나 리뷰를 일정한 형식으로 요약합니다.

llm_summarization.py
python
from pydantic import BaseModel, Field
 
 
class ArticleSummary(BaseModel):
    """기사 요약 결과"""
    title: str = Field(description="기사 제목")
    summary: str = Field(
        max_length=300,
        description="3문장 이내의 핵심 요약"
    )
    key_points: list[str] = Field(
        min_length=1,
        max_length=5,
        description="핵심 포인트 (1-5개)"
    )
    topics: list[str] = Field(description="관련 주제 태그")
    sentiment: Literal["positive", "negative", "neutral", "mixed"]
 
 
async def summarize_articles(
    articles: list[dict],
    client: instructor.AsyncInstructor
) -> list[ArticleSummary]:
    """기사 배치를 요약합니다."""
    import asyncio
 
    async def summarize_one(article: dict) -> ArticleSummary:
        return await client.chat.completions.create(
            model="gpt-4o-mini",
            response_model=ArticleSummary,
            messages=[
                {
                    "role": "system",
                    "content": "기사를 구조화된 형식으로 요약하세요."
                },
                {"role": "user", "content": article["content"]}
            ]
        )
 
    return await asyncio.gather(
        *[summarize_one(a) for a in articles]
    )

3. 데이터 정규화

자유 형식 텍스트를 표준 형식으로 정규화합니다. 예를 들어, 다양하게 표현된 주소를 일관된 형태로 변환합니다.

llm_normalization.py
python
from pydantic import BaseModel, Field
 
 
class NormalizedAddress(BaseModel):
    """정규화된 한국 주소"""
    province: str = Field(description="시/도")
    city: str = Field(description="시/군/구")
    district: str = Field(description="읍/면/동")
    detail: str = Field(description="상세 주소")
    postal_code: str | None = Field(description="우편번호")
    full_address: str = Field(description="정규화된 전체 주소")
 
 
async def normalize_addresses(
    raw_addresses: list[str],
    client: instructor.AsyncInstructor
) -> list[NormalizedAddress]:
    """비정형 주소 목록을 정규화합니다."""
    import asyncio
 
    async def normalize_one(addr: str) -> NormalizedAddress:
        return await client.chat.completions.create(
            model="gpt-4o-mini",
            response_model=NormalizedAddress,
            messages=[
                {
                    "role": "system",
                    "content": (
                        "한국 주소를 정규화하세요. "
                        "약어를 풀고, 빠진 행정구역을 추가하세요. "
                        "우편번호를 모르면 null로 표시하세요."
                    )
                },
                {"role": "user", "content": addr}
            ]
        )
 
    return await asyncio.gather(
        *[normalize_one(a) for a in raw_addresses]
    )
 
# 예시: 다양한 형태의 주소 입력
raw = [
    "서울 강남 테헤란로 123",
    "경기 성남시 분당구 판교역로 235번길 5",
    "부산광역시 해운대 우동 센텀시티",
]

4. 번역과 감성분석

다국어 데이터를 처리하면서 동시에 감성을 분석하는 복합 변환도 가능합니다.

llm_translation_sentiment.py
python
from pydantic import BaseModel, Field
from typing import Literal
 
 
class TranslatedReview(BaseModel):
    """번역 및 감성분석 결과"""
    original_language: str = Field(description="원문 언어")
    translated_text: str = Field(description="한국어 번역")
    sentiment: Literal["positive", "negative", "neutral", "mixed"]
    sentiment_score: float = Field(
        ge=-1.0, le=1.0,
        description="감성 점수 (-1: 매우 부정, 1: 매우 긍정)"
    )
    key_phrases: list[str] = Field(description="핵심 표현 (원문 기준)")

배치 크기 최적화

LLM Transform에서 배치 크기는 비용, 속도, 품질의 균형을 결정합니다.

단일 호출 vs 다중 항목 호출

여러 항목을 하나의 LLM 호출로 처리할 수 있습니다. 이 경우 토큰 효율이 높아지지만, 개별 항목의 품질이 저하될 수 있습니다.

batch_size_comparison.py
python
from pydantic import BaseModel, Field
 
 
# 접근 1: 개별 호출 (항목당 1회)
class SingleClassification(BaseModel):
    category: str
    confidence: float
 
 
# 접근 2: 배치 호출 (여러 항목을 한번에)
class BatchClassification(BaseModel):
    """여러 항목의 분류 결과"""
    results: list["ClassificationItem"]
 
 
class ClassificationItem(BaseModel):
    item_index: int = Field(description="입력 항목의 인덱스 (0부터)")
    category: str
    confidence: float

최적 배치 크기 결정

배치 크기장점단점
1 (개별)최고 품질, 에러 격리높은 비용, 느린 속도
5-10좋은 품질, 비용 절감약간의 품질 저하 가능
20-50높은 비용 효율품질 저하, 큰 응답 크기
50+최대 비용 절감품질 저하, 누락 위험
Tip

배치 크기의 최적값은 작업의 복잡도에 따라 달라집니다. 단순 분류는 10-20개를 한번에 처리해도 되지만, 요약이나 추출처럼 복잡한 작업은 1-5개가 적절합니다. 파일럿 테스트로 품질과 비용의 균형점을 찾으세요.


워크플로우 오케스트레이션

Apache Airflow 통합

Apache Airflow에서 LLM Transform을 DAG로 구성하는 예시입니다.

airflow_dag.py
python
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
 
 
default_args = {
    "owner": "data-team",
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
}
 
dag = DAG(
    "llm_review_pipeline",
    default_args=default_args,
    description="리뷰 데이터 LLM 변환 파이프라인",
    schedule="0 2 * * *",  # 매일 오전 2시
    start_date=datetime(2026, 1, 1),
    catchup=False,
)
 
 
def extract_reviews(**context):
    """새로운 리뷰를 소스 DB에서 추출합니다."""
    hook = PostgresHook(postgres_conn_id="source_db")
    reviews = hook.get_records(
        "SELECT id, content, created_at FROM reviews "
        "WHERE processed = false LIMIT 500"
    )
    context["task_instance"].xcom_push(key="reviews", value=reviews)
 
 
def transform_with_llm(**context):
    """LLM으로 리뷰를 분류하고 요약합니다."""
    import asyncio
 
    reviews = context["task_instance"].xcom_pull(
        task_ids="extract", key="reviews"
    )
 
    # 비동기 LLM 처리 (별도 모듈에서 구현)
    from pipeline.llm_transform import process_reviews
    results = asyncio.run(process_reviews(reviews))
 
    context["task_instance"].xcom_push(key="results", value=results)
 
 
def load_results(**context):
    """변환 결과를 대상 DB에 적재합니다."""
    results = context["task_instance"].xcom_pull(
        task_ids="transform", key="results"
    )
    hook = PostgresHook(postgres_conn_id="target_db")
    for result in results:
        hook.run(
            "INSERT INTO review_analysis "
            "(review_id, category, sentiment, summary) "
            "VALUES (%s, %s, %s, %s)",
            parameters=(
                result["id"],
                result["category"],
                result["sentiment"],
                result["summary"],
            )
        )
 
 
extract_task = PythonOperator(
    task_id="extract",
    python_callable=extract_reviews,
    dag=dag,
)
 
transform_task = PythonOperator(
    task_id="transform",
    python_callable=transform_with_llm,
    dag=dag,
)
 
load_task = PythonOperator(
    task_id="load",
    python_callable=load_results,
    dag=dag,
)
 
extract_task >> transform_task >> load_task

Prefect 통합

Prefect는 더 현대적인 Python 네이티브 워크플로우 도구입니다.

prefect_flow.py
python
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
 
 
@task(
    retries=2,
    retry_delay_seconds=30,
    cache_key_fn=task_input_hash,
    cache_expiration=timedelta(hours=1)
)
async def extract_documents(source: str, limit: int = 100) -> list[dict]:
    """문서를 소스에서 추출합니다."""
    # 데이터베이스에서 문서 로드
    pass
 
 
@task(retries=3, retry_delay_seconds=10)
async def transform_document(doc: dict) -> dict:
    """단일 문서를 LLM으로 변환합니다."""
    import instructor
    from openai import AsyncOpenAI
 
    client = instructor.from_openai(AsyncOpenAI())
    # LLM 변환 로직
    pass
 
 
@task
async def load_results(results: list[dict], target: str) -> int:
    """변환 결과를 적재합니다."""
    # 결과 저장 로직
    return len(results)
 
 
@flow(name="llm-etl-pipeline", log_prints=True)
async def llm_etl_pipeline(
    source: str = "reviews_db",
    target: str = "analytics_db",
    batch_size: int = 100
):
    """LLM 기반 ETL 파이프라인"""
 
    # Extract
    documents = await extract_documents(source, limit=batch_size)
    print(f"추출된 문서: {len(documents)}건")
 
    # Transform (병렬 처리)
    transform_tasks = [
        transform_document.submit(doc)
        for doc in documents
    ]
    results = [t.result() for t in transform_tasks]
    valid_results = [r for r in results if r is not None]
    print(f"변환 성공: {len(valid_results)}건")
 
    # Load
    loaded_count = await load_results(valid_results, target)
    print(f"적재 완료: {loaded_count}건")
 
    return loaded_count

정리

이번 장에서는 ETL 파이프라인에 LLM을 통합하는 방법을 학습했습니다.

핵심 내용을 정리하면 다음과 같습니다.

  • LLM-enhanced ETL은 Transform 단계에 자연어 이해 능력을 투입하여, 분류, 요약, 정규화, 번역, 감성분석을 수행합니다
  • 규칙 기반 변환이 효율적인 작업은 그대로 유지하고, LLM은 선택적으로 적용합니다
  • 배치 크기는 작업 복잡도에 따라 조정하며, 파일럿 테스트로 최적값을 결정합니다
  • Apache Airflow와 Prefect를 활용하여 LLM 변환을 자동화된 워크플로우로 구성할 수 있습니다

다음 장 미리보기

8장에서는 출력 검증과 폴백 전략을 심층적으로 다룹니다. 스키마 검증, 의미적 검증, 자동 재시도, 멀티 프로바이더 폴백, 부분 출력 복구 등 프로덕션 수준의 검증 파이프라인을 설계합니다.

이 글이 도움이 되셨나요?

관련 주제 더 보기

#structured-output#ai#data-engineering#llm

관련 글

AI / ML

8장: 출력 검증과 폴백 전략

스키마 검증, 의미적 검증, 자동 재시도, 멀티 프로바이더 폴백, 부분 출력 복구 등 프로덕션 수준의 검증 전략을 학습합니다.

2026년 4월 3일·18분
AI / ML

6장: LLM 기반 데이터 추출 자동화

대량 문서 처리 파이프라인을 구축하고, 배치 처리, 비동기 추출, 품질 검증 루프, 비용 최적화 전략을 학습합니다.

2026년 3월 30일·16분
AI / ML

9장: 프로덕션 AI 데이터 파이프라인

재시도, 서킷 브레이커, 관측 가능성, 비용 추적, 스키마 버전 관리 등 프로덕션 수준의 AI 파이프라인 운영 기법을 학습합니다.

2026년 4월 5일·15분
이전 글6장: LLM 기반 데이터 추출 자동화
다음 글8장: 출력 검증과 폴백 전략

댓글

목차

약 14분 남음
  • 학습 목표
  • 전통 ETL vs LLM-enhanced ETL
    • 비교
  • Transform 단계에 LLM 적용
    • 1. 텍스트 분류
    • 2. 텍스트 요약
    • 3. 데이터 정규화
    • 4. 번역과 감성분석
  • 배치 크기 최적화
    • 단일 호출 vs 다중 항목 호출
    • 최적 배치 크기 결정
  • 워크플로우 오케스트레이션
    • Apache Airflow 통합
    • Prefect 통합
  • 정리
  • 다음 장 미리보기