본문으로 건너뛰기
Kreath Archive
TechProjectsBooksAbout
TechProjectsBooksAbout

내비게이션

  • Tech
  • Projects
  • Books
  • About
  • Tags

카테고리

  • AI / ML
  • 웹 개발
  • 프로그래밍
  • 개발 도구

연결

  • GitHub
  • Email
  • RSS
© 2026 Kreath Archive. All rights reserved.Built with Next.js + MDX
홈TechProjectsBooksAbout
//
  1. 홈
  2. 테크
  3. 10장: 실전 프로젝트 -- 합성 데이터 파이프라인 구축
2026년 4월 5일·AI / ML·

10장: 실전 프로젝트 -- 합성 데이터 파이프라인 구축

엔드투엔드 합성 데이터 파이프라인 아키텍처, 생성-검증-필터링-증강-평가 통합, CI/CD 연동, 자동화된 품질 게이트, 비용 최적화, 프로덕션 운영 전략을 다룹니다.

24분2,195자9개 섹션
synthetic-dataaidata-engineeringllmmlops
공유
synthetic-data10 / 10
12345678910
이전9장: 합성 데이터 평가와 벤치마킹

이 장에서 배우는 것

  • 엔드투엔드 합성 데이터 파이프라인 아키텍처 설계
  • 생성, 검증, 필터링, 증강, 평가를 하나의 파이프라인으로 통합
  • CI/CD와 합성 데이터 파이프라인 연동
  • 자동화된 데이터 품질 게이트 구현
  • API 비용 최적화 전략
  • 프로덕션 환경에서의 운영과 모니터링

파이프라인 아키텍처 개요

이 시리즈에서 다룬 모든 기법을 하나의 엔드투엔드 파이프라인으로 통합합니다. 이 파이프라인은 시드 데이터로부터 프로덕션에 배포 가능한 고품질 합성 데이터셋을 자동으로 생성합니다.

핵심 설계 원칙

  1. 멱등성(Idempotency): 같은 설정으로 실행하면 같은 결과를 생성합니다.
  2. 재현 가능성(Reproducibility): 시드, 모델, 파라미터를 기록하여 재현이 가능합니다.
  3. 점진적 실행(Incremental): 중단된 지점에서 이어서 실행할 수 있습니다.
  4. 관측 가능성(Observability): 각 단계의 메트릭을 수집하고 모니터링합니다.

파이프라인 구현

프로젝트 구조

synthetic-data-pipeline/
  config/
    pipeline.yaml          # 파이프라인 설정
    seeds/                 # 시드 데이터
    quality_gates.yaml     # 품질 게이트 기준
  src/
    pipeline/
      __init__.py
      orchestrator.py      # 파이프라인 오케스트레이터
      generator.py         # 데이터 생성 모듈
      filter.py            # 필터링 모듈
      augmenter.py         # 증강 모듈
      evaluator.py         # 평가 모듈
      privacy.py           # 프라이버시 검증 모듈
    registry/
      dataset_registry.py  # 데이터셋 버전 관리
    utils/
      cost_tracker.py      # 비용 추적
      metrics.py           # 메트릭 수집
  tests/
    test_pipeline.py
    test_quality_gates.py
  output/
    datasets/              # 생성된 데이터셋
    reports/               # 평가 보고서
    logs/                  # 실행 로그

파이프라인 설정

config/pipeline.yaml
yaml
pipeline:
  name: "synthetic-data-v1"
  version: "1.0.0"
 
generation:
  model: "gpt-4o"
  method: "evol-instruct"  # evol-instruct, self-instruct, distillation
  temperature: 0.7
  num_samples: 10000
  batch_size: 50
  max_concurrent: 10
  seed_path: "config/seeds/"
 
  evol_instruct:
    depth_iterations: 3
    breadth_ratio: 0.3
    elimination_threshold: 0.5
 
filtering:
  format:
    min_length: 50
    max_length: 5000
    required_fields: ["instruction", "response"]
 
  quality:
    judge_model: "gpt-4o-mini"
    min_score: 3.5
    dimensions: ["accuracy", "completeness", "clarity", "relevance"]
 
  deduplication:
    exact: true
    near_dedup_threshold: 0.8
    minhash_num_perm: 128
 
privacy:
  pii_detection: true
  similarity_filter:
    min_distance: 0.1
  mia_test:
    enabled: true
    max_auc: 0.55
 
augmentation:
  enabled: true
  methods:
    paraphrase: 0.3
    style_transfer: 0.2
    hard_examples: 0.3
    edge_cases: 0.2
  target_ratio: 1.5  # 원본 대비 증강 비율
 
evaluation:
  tstr:
    models: ["logistic_regression", "random_forest", "gradient_boosting"]
    min_tstr_ratio: 0.90
  diversity:
    min_ttr: 0.3
    min_semantic_diversity: 0.4
  distribution:
    max_js_divergence: 0.15
 
quality_gate:
  tstr_ratio: 0.90
  diversity_score: 0.4
  pii_leak_rate: 0.001
  mia_auc: 0.55

오케스트레이터

src/pipeline/orchestrator.py
python
import yaml
import logging
import time
from pathlib import Path
from dataclasses import dataclass, field
from datetime import datetime
 
logger = logging.getLogger(__name__)
 
 
@dataclass
class PipelineMetrics:
    start_time: datetime = field(
        default_factory=datetime.now
    )
    total_generated: int = 0
    format_filtered: int = 0
    quality_filtered: int = 0
    dedup_filtered: int = 0
    privacy_filtered: int = 0
    augmented: int = 0
    final_count: int = 0
    total_cost: float = 0.0
    tstr_ratio: float = 0.0
    quality_gate_passed: bool = False
 
 
class SyntheticDataPipeline:
    """합성 데이터 파이프라인 오케스트레이터"""
 
    def __init__(self, config_path: str):
        with open(config_path) as f:
            self.config = yaml.safe_load(f)
 
        self.metrics = PipelineMetrics()
        self.output_dir = Path("output")
        self.output_dir.mkdir(parents=True, exist_ok=True)
 
    async def run(self) -> PipelineMetrics:
        """파이프라인 전체를 실행합니다."""
        logger.info(
            f"파이프라인 시작: {self.config['pipeline']['name']} "
            f"v{self.config['pipeline']['version']}"
        )
 
        try:
            # 1단계: 데이터 생성
            raw_data = await self._generate()
            self.metrics.total_generated = len(raw_data)
            logger.info(f"생성 완료: {len(raw_data)}건")
 
            # 2단계: 형식 필터링
            formatted = self._format_filter(raw_data)
            self.metrics.format_filtered = (
                len(raw_data) - len(formatted)
            )
            logger.info(
                f"형식 필터: {len(formatted)}건 통과 "
                f"({self.metrics.format_filtered}건 제거)"
            )
 
            # 3단계: 품질 필터링
            quality = await self._quality_filter(formatted)
            self.metrics.quality_filtered = (
                len(formatted) - len(quality)
            )
            logger.info(
                f"품질 필터: {len(quality)}건 통과 "
                f"({self.metrics.quality_filtered}건 제거)"
            )
 
            # 4단계: 중복 제거
            deduped = self._deduplicate(quality)
            self.metrics.dedup_filtered = (
                len(quality) - len(deduped)
            )
            logger.info(
                f"중복 제거: {len(deduped)}건 통과 "
                f"({self.metrics.dedup_filtered}건 제거)"
            )
 
            # 5단계: 프라이버시 검증
            private = self._privacy_check(deduped)
            self.metrics.privacy_filtered = (
                len(deduped) - len(private)
            )
            logger.info(
                f"프라이버시: {len(private)}건 통과 "
                f"({self.metrics.privacy_filtered}건 제거)"
            )
 
            # 6단계: 데이터 증강
            if self.config["augmentation"]["enabled"]:
                augmented = await self._augment(private)
                self.metrics.augmented = (
                    len(augmented) - len(private)
                )
                logger.info(
                    f"증강: {self.metrics.augmented}건 추가"
                )
            else:
                augmented = private
 
            # 7단계: 평가
            eval_results = await self._evaluate(augmented)
            self.metrics.tstr_ratio = eval_results.get(
                "tstr_ratio", 0
            )
 
            # 8단계: 품질 게이트
            self.metrics.quality_gate_passed = (
                self._quality_gate(eval_results)
            )
            self.metrics.final_count = len(augmented)
 
            if self.metrics.quality_gate_passed:
                self._save_dataset(augmented, eval_results)
                logger.info("품질 게이트 통과! 데이터셋 저장 완료")
            else:
                logger.warning(
                    "품질 게이트 미통과. 설정을 조정하세요."
                )
 
            return self.metrics
 
        except Exception as e:
            logger.error(f"파이프라인 실패: {e}")
            raise
 
    async def _generate(self) -> list[dict]:
        """데이터 생성 단계"""
        gen_config = self.config["generation"]
        # Generator 모듈 호출
        from .generator import DataGenerator
        generator = DataGenerator(gen_config)
        return await generator.generate()
 
    def _format_filter(self, data: list[dict]) -> list[dict]:
        """형식 필터링 단계"""
        config = self.config["filtering"]["format"]
        return [
            item for item in data
            if all(
                field in item
                for field in config["required_fields"]
            )
            and config["min_length"]
            <= len(item.get("response", ""))
            <= config["max_length"]
        ]
 
    async def _quality_filter(
        self, data: list[dict]
    ) -> list[dict]:
        """품질 필터링 단계"""
        from .filter import QualityFilter
        qf = QualityFilter(self.config["filtering"]["quality"])
        return await qf.filter(data)
 
    def _deduplicate(self, data: list[dict]) -> list[dict]:
        """중복 제거 단계"""
        from .filter import Deduplicator
        dedup = Deduplicator(
            self.config["filtering"]["deduplication"]
        )
        return dedup.deduplicate(data)
 
    def _privacy_check(self, data: list[dict]) -> list[dict]:
        """프라이버시 검증 단계"""
        from .privacy import PrivacyChecker
        checker = PrivacyChecker(self.config["privacy"])
        return checker.check(data)
 
    async def _augment(self, data: list[dict]) -> list[dict]:
        """데이터 증강 단계"""
        from .augmenter import DataAugmenter
        aug = DataAugmenter(self.config["augmentation"])
        return await aug.augment(data)
 
    async def _evaluate(self, data: list[dict]) -> dict:
        """평가 단계"""
        from .evaluator import DatasetEvaluator
        evaluator = DatasetEvaluator(self.config["evaluation"])
        return await evaluator.evaluate(data)
 
    def _quality_gate(self, eval_results: dict) -> bool:
        """품질 게이트 판정"""
        gate = self.config["quality_gate"]
        checks = {
            "tstr_ratio": (
                eval_results.get("tstr_ratio", 0)
                >= gate["tstr_ratio"]
            ),
            "diversity": (
                eval_results.get("diversity_score", 0)
                >= gate["diversity_score"]
            ),
            "pii": (
                eval_results.get("pii_leak_rate", 1)
                <= gate["pii_leak_rate"]
            ),
            "mia": (
                eval_results.get("mia_auc", 1)
                <= gate["mia_auc"]
            ),
        }
 
        for check_name, passed in checks.items():
            if not passed:
                logger.warning(f"품질 게이트 실패: {check_name}")
 
        return all(checks.values())
 
    def _save_dataset(
        self, data: list[dict], eval_results: dict
    ) -> None:
        """데이터셋과 메타데이터를 저장합니다."""
        import json
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        dataset_dir = self.output_dir / "datasets" / timestamp
        dataset_dir.mkdir(parents=True, exist_ok=True)
 
        # 데이터 저장
        with open(
            dataset_dir / "data.jsonl", "w", encoding="utf-8"
        ) as f:
            for item in data:
                f.write(
                    json.dumps(item, ensure_ascii=False) + "\n"
                )
 
        # 메타데이터 저장
        metadata = {
            "pipeline_config": self.config,
            "metrics": {
                "total_generated": self.metrics.total_generated,
                "final_count": self.metrics.final_count,
                "pass_rate": (
                    self.metrics.final_count
                    / self.metrics.total_generated
                    if self.metrics.total_generated > 0
                    else 0
                ),
            },
            "evaluation": eval_results,
            "timestamp": timestamp,
        }
 
        with open(
            dataset_dir / "metadata.json",
            "w",
            encoding="utf-8",
        ) as f:
            json.dump(metadata, f, ensure_ascii=False, indent=2)

CI/CD 연동

합성 데이터 파이프라인을 CI/CD 시스템에 통합하면 모델 학습 데이터의 자동화된 갱신이 가능합니다.

GitHub Actions 워크플로우

.github/workflows/synthetic-data.yaml
yaml
name: Synthetic Data Pipeline
 
on:
  schedule:
    - cron: "0 2 * * 1"  # 매주 월요일 새벽 2시
  workflow_dispatch:
    inputs:
      num_samples:
        description: "생성할 샘플 수"
        required: false
        default: "10000"
 
jobs:
  generate:
    runs-on: ubuntu-latest
    timeout-minutes: 120
 
    steps:
      - uses: actions/checkout@v4
 
      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: "3.12"
 
      - name: Install dependencies
        run: pip install -r requirements.txt
 
      - name: Run pipeline
        env:
          OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
        run: |
          python -m src.pipeline.orchestrator \
            --config config/pipeline.yaml \
            --num-samples ${{ github.event.inputs.num_samples || '10000' }}
 
      - name: Upload dataset artifact
        if: success()
        uses: actions/upload-artifact@v4
        with:
          name: synthetic-dataset-${{ github.run_number }}
          path: output/datasets/
          retention-days: 30
 
      - name: Quality gate check
        run: |
          python -m src.pipeline.check_quality_gate \
            --report output/reports/latest.json
 
      - name: Notify on failure
        if: failure()
        run: |
          echo "파이프라인 실패. 로그를 확인하세요."

데이터 버전 관리

src/registry/dataset_registry.py
python
import json
import hashlib
from pathlib import Path
from dataclasses import dataclass, asdict
from datetime import datetime
 
 
@dataclass
class DatasetVersion:
    version: str
    created_at: str
    num_samples: int
    data_hash: str
    pipeline_config_hash: str
    tstr_ratio: float
    diversity_score: float
    quality_gate_passed: bool
    file_path: str
 
 
class DatasetRegistry:
    """데이터셋 버전 관리 레지스트리"""
 
    def __init__(self, registry_path: str = "output/registry.json"):
        self.registry_path = Path(registry_path)
        self.versions: list[DatasetVersion] = []
        self._load()
 
    def _load(self) -> None:
        """레지스트리를 로드합니다."""
        if self.registry_path.exists():
            with open(self.registry_path) as f:
                data = json.load(f)
                self.versions = [
                    DatasetVersion(**v) for v in data
                ]
 
    def register(
        self, dataset_path: str, metadata: dict
    ) -> DatasetVersion:
        """새 데이터셋 버전을 등록합니다."""
        # 버전 번호 자동 생성
        version_num = len(self.versions) + 1
        version = f"v{version_num}.0.0"
 
        # 데이터 해시 계산
        data_hash = self._compute_hash(dataset_path)
 
        entry = DatasetVersion(
            version=version,
            created_at=datetime.now().isoformat(),
            num_samples=metadata["num_samples"],
            data_hash=data_hash,
            pipeline_config_hash=metadata.get(
                "config_hash", ""
            ),
            tstr_ratio=metadata.get("tstr_ratio", 0),
            diversity_score=metadata.get("diversity_score", 0),
            quality_gate_passed=metadata.get(
                "quality_gate_passed", False
            ),
            file_path=dataset_path,
        )
 
        self.versions.append(entry)
        self._save()
        return entry
 
    def get_latest(self) -> DatasetVersion | None:
        """가장 최근의 품질 게이트 통과 버전을 반환합니다."""
        passed = [
            v for v in self.versions
            if v.quality_gate_passed
        ]
        return passed[-1] if passed else None
 
    def _compute_hash(self, file_path: str) -> str:
        """파일의 해시를 계산합니다."""
        hasher = hashlib.sha256()
        with open(file_path, "rb") as f:
            for chunk in iter(lambda: f.read(8192), b""):
                hasher.update(chunk)
        return hasher.hexdigest()[:16]
 
    def _save(self) -> None:
        """레지스트리를 저장합니다."""
        self.registry_path.parent.mkdir(
            parents=True, exist_ok=True
        )
        with open(self.registry_path, "w", encoding="utf-8") as f:
            json.dump(
                [asdict(v) for v in self.versions],
                f,
                ensure_ascii=False,
                indent=2,
            )

자동화된 품질 게이트

품질 게이트는 합성 데이터가 프로덕션에 사용될 수 있는지를 자동으로 판정하는 관문입니다.

품질 게이트 설정

config/quality_gates.yaml
yaml
gates:
  - name: "format_completeness"
    description: "필수 필드의 완전성"
    threshold: 0.99
    metric: "format_pass_rate"
    action_on_fail: "block"
 
  - name: "quality_score"
    description: "LLM-as-Judge 평균 점수"
    threshold: 3.5
    metric: "avg_quality_score"
    action_on_fail: "block"
 
  - name: "tstr_performance"
    description: "TSTR/TRTR 비율"
    threshold: 0.90
    metric: "tstr_ratio"
    action_on_fail: "block"
 
  - name: "diversity"
    description: "의미적 다양성 점수"
    threshold: 0.40
    metric: "semantic_diversity"
    action_on_fail: "warn"
 
  - name: "pii_safety"
    description: "PII 유출률"
    threshold: 0.001
    metric: "pii_leak_rate"
    action_on_fail: "block"
 
  - name: "privacy_mia"
    description: "멤버십 추론 AUC"
    threshold: 0.55
    metric: "mia_auc"
    action_on_fail: "block"
 
  - name: "duplication_rate"
    description: "데이터 중복률"
    threshold: 0.05
    metric: "duplicate_ratio"
    action_on_fail: "warn"
src/pipeline/quality_gate.py
python
import yaml
import logging
from dataclasses import dataclass
 
logger = logging.getLogger(__name__)
 
 
@dataclass
class GateResult:
    name: str
    passed: bool
    actual_value: float
    threshold: float
    action: str
 
 
def check_quality_gates(
    metrics: dict,
    gate_config_path: str = "config/quality_gates.yaml",
) -> tuple[bool, list[GateResult]]:
    """품질 게이트를 검사합니다."""
    with open(gate_config_path) as f:
        config = yaml.safe_load(f)
 
    results = []
    all_blocking_passed = True
 
    for gate in config["gates"]:
        actual = metrics.get(gate["metric"], None)
        if actual is None:
            logger.warning(
                f"메트릭 누락: {gate['metric']}"
            )
            continue
 
        # 임계값과 비교 (PII, MIA는 이하, 나머지는 이상)
        if gate["metric"] in ["pii_leak_rate", "mia_auc", "duplicate_ratio"]:
            passed = actual <= gate["threshold"]
        else:
            passed = actual >= gate["threshold"]
 
        result = GateResult(
            name=gate["name"],
            passed=passed,
            actual_value=actual,
            threshold=gate["threshold"],
            action=gate["action_on_fail"],
        )
        results.append(result)
 
        if not passed:
            log_msg = (
                f"게이트 '{gate['name']}' 미통과: "
                f"{actual:.4f} (기준: {gate['threshold']})"
            )
            if gate["action_on_fail"] == "block":
                logger.error(log_msg)
                all_blocking_passed = False
            else:
                logger.warning(log_msg)
 
    return all_blocking_passed, results
Warning

품질 게이트의 기준값은 고정된 상수가 아닙니다. 프로젝트 초기에는 느슨하게 시작하여 데이터 생성 역량이 향상됨에 따라 점진적으로 기준을 높여가는 것이 현실적입니다. 처음부터 지나치게 엄격한 기준을 설정하면 파이프라인이 한 번도 통과하지 못할 수 있습니다.


비용 최적화

LLM API 비용은 대규모 합성 데이터 생성에서 가장 큰 운영 비용입니다.

비용 추적

src/utils/cost_tracker.py
python
from dataclasses import dataclass, field
from datetime import datetime
 
# 2026년 기준 예상 비용 (1M 토큰당 USD)
MODEL_COSTS = {
    "gpt-4o": {"input": 2.50, "output": 10.00},
    "gpt-4o-mini": {"input": 0.15, "output": 0.60},
    "claude-3-5-sonnet": {"input": 3.00, "output": 15.00},
    "claude-3-5-haiku": {"input": 0.25, "output": 1.25},
}
 
 
@dataclass
class CostTracker:
    model: str
    input_tokens: int = 0
    output_tokens: int = 0
    api_calls: int = 0
    start_time: datetime = field(
        default_factory=datetime.now
    )
 
    def add_usage(
        self, input_tokens: int, output_tokens: int
    ) -> None:
        """API 호출 사용량을 기록합니다."""
        self.input_tokens += input_tokens
        self.output_tokens += output_tokens
        self.api_calls += 1
 
    @property
    def total_cost(self) -> float:
        """현재까지의 총 비용을 계산합니다."""
        costs = MODEL_COSTS.get(self.model, MODEL_COSTS["gpt-4o"])
        input_cost = (self.input_tokens / 1_000_000) * costs["input"]
        output_cost = (
            (self.output_tokens / 1_000_000) * costs["output"]
        )
        return input_cost + output_cost
 
    def report(self) -> str:
        """비용 보고서를 생성합니다."""
        elapsed = (datetime.now() - self.start_time).total_seconds()
        return (
            f"모델: {self.model}\n"
            f"API 호출: {self.api_calls}회\n"
            f"입력 토큰: {self.input_tokens:,}\n"
            f"출력 토큰: {self.output_tokens:,}\n"
            f"총 비용: ${self.total_cost:.2f}\n"
            f"샘플당 비용: "
            f"${self.total_cost / max(self.api_calls, 1):.4f}\n"
            f"소요 시간: {elapsed:.0f}초"
        )

비용 절감 전략

전략절감 효과구현 복잡도
생성에는 소형 모델, 평가에만 대형 모델 사용50~70%낮음
배치 API 활용 (비동기 배치 처리)30~50%낮음
캐싱: 동일 프롬프트 결과 재사용10~30%중간
점진적 품질 필터: 저렴한 필터를 먼저 적용20~40%중간
오픈소스 모델 자체 호스팅70~90%높음
cost_optimization.py
python
class TieredGenerationPipeline:
    """계층적 모델 사용으로 비용을 최적화합니다."""
 
    def __init__(self):
        self.generation_model = "gpt-4o-mini"  # 저렴한 모델로 생성
        self.judge_model = "gpt-4o-mini"       # 1차 평가도 저렴하게
        self.final_judge = "gpt-4o"            # 최종 평가만 고급 모델
 
    async def generate_and_filter(
        self, seeds: list[dict], target_count: int
    ) -> list[dict]:
        """비용 효율적으로 생성하고 필터링합니다."""
 
        # 1단계: 저렴한 모델로 대량 생성 (target의 3배)
        raw = await self._generate_batch(
            seeds,
            model=self.generation_model,
            count=target_count * 3,
        )
 
        # 2단계: 저렴한 모델로 1차 필터 (상위 50%)
        scored = await self._score_batch(
            raw, model=self.judge_model
        )
        top_half = sorted(
            scored, key=lambda x: x["score"], reverse=True
        )[:len(scored) // 2]
 
        # 3단계: 고급 모델로 최종 평가 (target 수만큼)
        final = await self._score_batch(
            top_half, model=self.final_judge
        )
        result = sorted(
            final, key=lambda x: x["score"], reverse=True
        )[:target_count]
 
        return result
Tip

비용 최적화의 핵심 원칙은 "싼 필터를 먼저, 비싼 필터를 나중에"입니다. 형식 검증(무료)으로 30%를 걸러내고, 소형 모델 평가($0.001/건)로 50%를 추가 제거한 후, 남은 20%만 대형 모델($0.01/건)로 최종 평가하면 전체 비용을 70% 이상 절감할 수 있습니다.


프로덕션 운영

모니터링 대시보드

src/utils/monitoring.py
python
from dataclasses import dataclass
from datetime import datetime
 
 
@dataclass
class PipelineHealthMetrics:
    timestamp: datetime
    pipeline_status: str  # running, completed, failed
    generation_rate: float  # 샘플/분
    filter_pass_rate: float  # 전체 통과율
    quality_score_avg: float
    cost_per_sample: float
    estimated_completion: str
 
 
def generate_dashboard_data(
    pipeline_metrics: dict,
) -> dict:
    """모니터링 대시보드 데이터를 생성합니다."""
    return {
        "summary": {
            "total_datasets": pipeline_metrics.get(
                "total_versions", 0
            ),
            "latest_version": pipeline_metrics.get(
                "latest_version", "N/A"
            ),
            "latest_tstr_ratio": pipeline_metrics.get(
                "latest_tstr", 0
            ),
            "total_samples": pipeline_metrics.get(
                "total_samples", 0
            ),
            "total_cost": pipeline_metrics.get(
                "total_cost", 0
            ),
        },
        "quality_trends": {
            "tstr_ratio_history": pipeline_metrics.get(
                "tstr_history", []
            ),
            "diversity_history": pipeline_metrics.get(
                "diversity_history", []
            ),
        },
        "alerts": pipeline_metrics.get("alerts", []),
    }

장애 대응 전략

장애 유형증상대응
API 레이트 리밋429 에러 증가지수 백오프, 동시 요청 수 감소
품질 저하TSTR 비율 하락시드 데이터 점검, 온도 조정
비용 초과예산 임계값 도달파이프라인 일시 정지, 모델 다운그레이드
프라이버시 유출PII 탐지율 상승즉시 중단, 필터 강화
src/utils/error_handling.py
python
import asyncio
import logging
from functools import wraps
 
logger = logging.getLogger(__name__)
 
 
def retry_with_backoff(
    max_retries: int = 5,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
):
    """지수 백오프로 재시도하는 데코레이터"""
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            for attempt in range(max_retries):
                try:
                    return await func(*args, **kwargs)
                except Exception as e:
                    if attempt == max_retries - 1:
                        raise
                    delay = min(
                        base_delay * (2 ** attempt),
                        max_delay,
                    )
                    logger.warning(
                        f"시도 {attempt + 1}/{max_retries} "
                        f"실패: {e}. "
                        f"{delay:.1f}초 후 재시도..."
                    )
                    await asyncio.sleep(delay)
        return wrapper
    return decorator

전체 파이프라인 실행

모든 구성 요소를 연결하여 파이프라인을 실행합니다.

run_pipeline.py
python
import asyncio
import argparse
import logging
 
from src.pipeline.orchestrator import SyntheticDataPipeline
 
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger(__name__)
 
 
async def main(config_path: str) -> None:
    """파이프라인을 실행합니다."""
    pipeline = SyntheticDataPipeline(config_path)
    metrics = await pipeline.run()
 
    logger.info("\n========== 파이프라인 결과 ==========")
    logger.info(f"총 생성: {metrics.total_generated}")
    logger.info(f"형식 필터: -{metrics.format_filtered}")
    logger.info(f"품질 필터: -{metrics.quality_filtered}")
    logger.info(f"중복 제거: -{metrics.dedup_filtered}")
    logger.info(f"프라이버시: -{metrics.privacy_filtered}")
    logger.info(f"증강 추가: +{metrics.augmented}")
    logger.info(f"최종 데이터: {metrics.final_count}")
    logger.info(f"TSTR 비율: {metrics.tstr_ratio:.4f}")
    logger.info(f"총 비용: ${metrics.total_cost:.2f}")
    logger.info(
        f"품질 게이트: "
        f"{'통과' if metrics.quality_gate_passed else '미통과'}"
    )
    pass_rate = (
        metrics.final_count / metrics.total_generated * 100
        if metrics.total_generated > 0 else 0
    )
    logger.info(f"전체 통과율: {pass_rate:.1f}%")
 
 
if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--config",
        default="config/pipeline.yaml",
        help="파이프라인 설정 파일 경로",
    )
    args = parser.parse_args()
 
    asyncio.run(main(args.config))

시리즈 전체 정리

10장에 걸쳐 합성 데이터 엔지니어링의 전체 스펙트럼을 다루었습니다.

장핵심 키워드핵심 교훈
1장개요, 모델 붕괴합성 데이터는 필수이지만 위험도 존재합니다
2장Evol-Instruct, Self-Instruct체계적 진화와 부트스트래핑으로 데이터를 확장합니다
3장텍스트 합성 실전태스크별 특화된 생성과 검증이 필요합니다
4장구조화/멀티모달테이블은 상관관계, 이미지는 정렬이 핵심입니다
5장품질 검증3계층 평가와 LLM-as-Judge가 표준입니다
6장증강 기법생성과 증강은 상호 보완적이며, 포화점이 존재합니다
7장프라이버시차등 프라이버시와 MIA 테스트로 안전성을 보장합니다
8장도메인 특화전문가 시드와 도메인 검증이 품질의 상한선을 결정합니다
9장평가/벤치마킹TSTR이 유용성의 표준이며, 다양성과 함께 측정합니다
10장프로덕션 파이프라인자동화, 품질 게이트, 비용 최적화가 운영의 핵심입니다

합성 데이터 엔지니어링은 단순한 "데이터 만들기"가 아닙니다. 생성, 검증, 필터링, 증강, 평가, 운영이 유기적으로 연결된 엔드투엔드 시스템을 설계하고 운영하는 엔지니어링 분야입니다.

Info

합성 데이터의 미래는 밝지만, "만능 해결책"은 아닙니다. 실제 데이터와의 적절한 혼합, 지속적인 품질 모니터링, 프라이버시 검증이 동반되어야 합니다. 이 시리즈가 합성 데이터 엔지니어링의 실전 가이드로서 도움이 되기를 바랍니다.

이 글이 도움이 되셨나요?

관련 주제 더 보기

#synthetic-data#ai#data-engineering#llm#mlops

관련 글

AI / ML

9장: 합성 데이터 평가와 벤치마킹

TSTR 방법론, 다양성 메트릭, 분포 비교, 다운스트림 성능 측정, 합성 vs 실제 데이터 비교 실험, 벤치마크 설계 방법을 다룹니다.

2026년 4월 5일·21분
AI / ML

8장: 도메인 특화 데이터셋 구축

의료, 법률, 금융, 코드 도메인별 합성 데이터 접근법, 전문가 시드 데이터 설계, InstructLab 택소노미 방식, 도메인 검증 전략을 다룹니다.

2026년 4월 4일·22분
AI / ML

7장: 프라이버시 보존 합성 데이터

차등 프라이버시, PII 마스킹, 멤버십 추론 공격 방어, 유사도 필터, 규제 대응 전략과 프라이버시-유용성 트레이드오프를 다룹니다.

2026년 4월 2일·20분
이전 글9장: 합성 데이터 평가와 벤치마킹

댓글

목차

약 24분 남음
  • 이 장에서 배우는 것
  • 파이프라인 아키텍처 개요
    • 핵심 설계 원칙
  • 파이프라인 구현
    • 프로젝트 구조
    • 파이프라인 설정
    • 오케스트레이터
  • CI/CD 연동
    • GitHub Actions 워크플로우
    • 데이터 버전 관리
  • 자동화된 품질 게이트
    • 품질 게이트 설정
  • 비용 최적화
    • 비용 추적
    • 비용 절감 전략
  • 프로덕션 운영
    • 모니터링 대시보드
    • 장애 대응 전략
  • 전체 파이프라인 실행
  • 시리즈 전체 정리