본문으로 건너뛰기
Kreath Archive
TechProjectsBooksAbout
TechProjectsBooksAbout

내비게이션

  • Tech
  • Projects
  • Books
  • About
  • Tags

카테고리

  • AI / ML
  • 웹 개발
  • 프로그래밍
  • 개발 도구

연결

  • GitHub
  • Email
  • RSS
© 2026 Kreath Archive. All rights reserved.Built with Next.js + MDX
홈TechProjectsBooksAbout
//
  1. 홈
  2. 테크
  3. 9장: 프로덕션 파이프라인 구축
2026년 3월 31일·AI / ML·

9장: 프로덕션 파이프라인 구축

지식 그래프의 증분 업데이트, 데이터 품질 검증, 스케일링 전략, 모니터링, 비용 최적화, 그리고 Graphiti를 활용한 실시간 KG 업데이트까지 프로덕션 운영의 핵심을 다룹니다.

20분1,453자10개 섹션
knowledge-graphaidata-engineering
공유
knowledge-graph9 / 10
12345678910
이전8장: 지식 그래프 쿼리와 추론다음10장: 실전 프로젝트 — Knowledge Graph + AI 시스템

학습 목표

  • 지식 그래프의 증분 업데이트(Incremental Update) 전략을 이해합니다
  • **변경 감지(Change Detection)**와 데이터 품질 검증 방법을 습득합니다
  • 대규모 그래프의 스케일링 전략을 파악합니다
  • 모니터링, 백업/복구, 비용 최적화 기법을 익힙니다
  • Graphiti를 활용한 실시간 KG 업데이트 방식을 이해합니다

프로덕션 운영의 과제

개발 환경에서 지식 그래프를 구축하는 것과 프로덕션에서 운영하는 것은 근본적으로 다른 문제입니다. 프로덕션 환경에서는 다음과 같은 과제에 직면합니다.


증분 업데이트

왜 증분 업데이트가 필요한가

문서가 추가되거나 수정될 때마다 전체 지식 그래프를 재구축하는 것은 비효율적입니다. LLM API 비용, 처리 시간, 서비스 중단 리스크를 고려하면 변경된 부분만 업데이트하는 증분 방식이 필수입니다.

변경 감지 메커니즘

change_detection.py
python
import hashlib
from datetime import datetime
from dataclasses import dataclass
 
@dataclass
class DocumentChange:
    doc_id: str
    change_type: str  # "created", "modified", "deleted"
    content_hash: str
    detected_at: datetime
 
class ChangeDetector:
    """문서 변경을 감지하는 클래스입니다."""
 
    def __init__(self, driver):
        self.driver = driver
 
    def compute_hash(self, content: str) -> str:
        """콘텐츠의 해시를 계산합니다."""
        return hashlib.sha256(content.encode()).hexdigest()
 
    def detect_changes(self, documents: list[dict]) -> list[DocumentChange]:
        """문서 목록의 변경 사항을 감지합니다."""
        changes = []
 
        for doc in documents:
            new_hash = self.compute_hash(doc["content"])
 
            # 기존 문서와 해시 비교
            records, _, _ = self.driver.execute_query("""
                MATCH (d:Document {externalId: $docId})
                RETURN d.contentHash AS hash, d.title AS title
            """, docId=doc["id"])
 
            if not records:
                # 새 문서
                changes.append(DocumentChange(
                    doc_id=doc["id"],
                    change_type="created",
                    content_hash=new_hash,
                    detected_at=datetime.now()
                ))
            elif records[0]["hash"] != new_hash:
                # 수정된 문서
                changes.append(DocumentChange(
                    doc_id=doc["id"],
                    change_type="modified",
                    content_hash=new_hash,
                    detected_at=datetime.now()
                ))
 
        # 삭제된 문서 감지
        current_ids = {doc["id"] for doc in documents}
        records, _, _ = self.driver.execute_query("""
            MATCH (d:Document)
            WHERE d.externalId IS NOT NULL
            RETURN d.externalId AS docId
        """)
        for record in records:
            if record["docId"] not in current_ids:
                changes.append(DocumentChange(
                    doc_id=record["docId"],
                    change_type="deleted",
                    content_hash="",
                    detected_at=datetime.now()
                ))
 
        return changes

증분 업데이트 파이프라인

incremental_update.py
python
class IncrementalUpdater:
    """증분 업데이트를 수행하는 파이프라인입니다."""
 
    def __init__(self, driver, extractor, resolver):
        self.driver = driver
        self.extractor = extractor
        self.resolver = resolver
 
    def process_changes(self, changes: list[DocumentChange],
                        documents: dict[str, dict]) -> dict:
        """변경 사항에 따라 그래프를 업데이트합니다."""
        stats = {"created": 0, "modified": 0, "deleted": 0}
 
        for change in changes:
            if change.change_type == "created":
                self._add_document(documents[change.doc_id])
                stats["created"] += 1
 
            elif change.change_type == "modified":
                self._update_document(documents[change.doc_id], change.content_hash)
                stats["modified"] += 1
 
            elif change.change_type == "deleted":
                self._remove_document(change.doc_id)
                stats["deleted"] += 1
 
        return stats
 
    def _add_document(self, doc: dict) -> None:
        """새 문서에서 엔티티/관계를 추출하여 그래프에 추가합니다."""
        # 5장의 추출 파이프라인 활용
        result = self.extractor.extract_entities(doc["content"])
        entities, relationships = validate_extraction(result)
        resolved = self.resolver.resolve_batch(entities)
        # Neo4j에 MERGE로 적재 (중복 방지)
        self._load_to_graph(doc, resolved, relationships)
 
    def _update_document(self, doc: dict, new_hash: str) -> None:
        """수정된 문서의 관련 노드/관계를 업데이트합니다."""
        # 1. 해당 문서에서 파생된 관계 삭제
        self.driver.execute_query("""
            MATCH (d:Document {externalId: $docId})-[r:EXTRACTED_FROM]-(e)
            DELETE r
        """, docId=doc["id"])
 
        # 2. 새로운 추출 결과로 재연결
        self._add_document(doc)
 
        # 3. 해시 업데이트
        self.driver.execute_query("""
            MATCH (d:Document {externalId: $docId})
            SET d.contentHash = $hash, d.updatedAt = datetime()
        """, docId=doc["id"], hash=new_hash)
 
    def _remove_document(self, doc_id: str) -> None:
        """삭제된 문서와 관련 관계를 정리합니다."""
        # 문서 노드와 EXTRACTED_FROM 관계만 삭제
        # 다른 문서에서도 참조하는 엔티티는 유지
        self.driver.execute_query("""
            MATCH (d:Document {externalId: $docId})
            OPTIONAL MATCH (d)-[r:EXTRACTED_FROM]->(e)
            WITH d, r, e
            DELETE r
            WITH d, e
            WHERE NOT EXISTS { MATCH (e)<-[:EXTRACTED_FROM]-(:Document) }
              AND NOT EXISTS { MATCH (e)<-[:REFERENCES]-() }
            DETACH DELETE e
            WITH d
            DETACH DELETE d
        """, docId=doc_id)
 
    def _load_to_graph(self, doc: dict, entities: list, relationships: list) -> None:
        """추출된 데이터를 그래프에 적재합니다."""
        # MERGE를 사용하여 기존 엔티티와 안전하게 연결
        for entity in entities:
            self.driver.execute_query("""
                MERGE (e {name: $name})
                ON CREATE SET e:$type, e += $props
                WITH e
                MATCH (d:Document {externalId: $docId})
                MERGE (d)-[:EXTRACTED_FROM]->(e)
            """, name=entity["name"], type=entity["type"],
                props=entity.get("properties", {}), docId=doc["id"])
Warning

엔티티 삭제 시 주의해야 합니다. 여러 문서에서 참조하는 엔티티를 한 문서의 삭제로 인해 함께 삭제하면 그래프의 연결성이 손상됩니다. 위 코드에서는 다른 문서에서 더 이상 참조하지 않는 엔티티만 삭제합니다.


데이터 품질 검증

LLM 추출은 완벽하지 않습니다. 지속적인 품질 검증이 필요합니다.

자동 검증 규칙

quality_checks.py
python
class GraphQualityChecker:
    """그래프 데이터 품질을 검증합니다."""
 
    def __init__(self, driver):
        self.driver = driver
 
    def check_orphan_nodes(self) -> list[dict]:
        """관계가 없는 고립된 노드를 탐지합니다."""
        records, _, _ = self.driver.execute_query("""
            MATCH (n)
            WHERE NOT (n)--()
            RETURN labels(n) AS type, n.name AS name, elementId(n) AS id
            LIMIT 100
        """)
        return [r.data() for r in records]
 
    def check_duplicate_entities(self) -> list[dict]:
        """유사한 이름의 중복 엔티티를 탐지합니다."""
        records, _, _ = self.driver.execute_query("""
            MATCH (a), (b)
            WHERE a <> b
              AND labels(a) = labels(b)
              AND apoc.text.levenshteinSimilarity(a.name, b.name) > 0.85
            RETURN a.name AS name1, b.name AS name2,
                   apoc.text.levenshteinSimilarity(a.name, b.name) AS similarity
            LIMIT 50
        """)
        return [r.data() for r in records]
 
    def check_missing_properties(self) -> list[dict]:
        """필수 속성이 누락된 노드를 탐지합니다."""
        records, _, _ = self.driver.execute_query("""
            MATCH (n)
            WHERE n.name IS NULL OR trim(n.name) = ""
            RETURN labels(n) AS type, elementId(n) AS id, properties(n) AS props
            LIMIT 100
        """)
        return [r.data() for r in records]
 
    def check_invalid_relationships(self) -> list[dict]:
        """스키마에 맞지 않는 관계를 탐지합니다."""
        # 예: Person -DEPENDS_ON-> Technology는 허용되지 않음
        records, _, _ = self.driver.execute_query("""
            MATCH (p:Person)-[r:DEPENDS_ON]->(t)
            RETURN p.name AS person, type(r) AS relType, t.name AS target
            LIMIT 50
        """)
        return [r.data() for r in records]
 
    def run_all_checks(self) -> dict:
        """모든 품질 검증을 실행합니다."""
        return {
            "orphan_nodes": self.check_orphan_nodes(),
            "duplicate_entities": self.check_duplicate_entities(),
            "missing_properties": self.check_missing_properties(),
            "invalid_relationships": self.check_invalid_relationships(),
        }

품질 메트릭 대시보드

quality-metrics.cypher
cypher
// 그래프 통계 요약
CALL apoc.meta.stats() YIELD nodeCount, relCount, labels, relTypes
RETURN nodeCount, relCount, labels, relTypes;
 
// 레이블별 노드 수
MATCH (n)
RETURN labels(n) AS type, count(n) AS count
ORDER BY count DESC;
 
// 관계 타입별 수
MATCH ()-[r]->()
RETURN type(r) AS relType, count(r) AS count
ORDER BY count DESC;
 
// 평균 연결도 (노드당 평균 관계 수)
MATCH (n)
WITH n, size([(n)--() | 1]) AS degree
RETURN avg(degree) AS avgDegree,
       min(degree) AS minDegree,
       max(degree) AS maxDegree,
       percentileCont(degree, 0.5) AS medianDegree;

스케일링 전략

수직적 스케일링

노드 수가 수백만 이하인 경우, Neo4j 단일 인스턴스의 리소스를 늘리는 것으로 충분합니다.

그래프 규모권장 메모리권장 CPU
100K 노드 이하4GB2 코어
1M 노드 이하16GB4 코어
10M 노드 이하64GB8 코어
100M+ 노드클러스터 필요16+ 코어

수평적 스케일링 (Neo4j Cluster)

Neo4j Enterprise에서는 Primary-Secondary 복제를 통해 읽기 부하를 분산할 수 있습니다.

쿼리 최적화 체크리스트

query-optimization.cypher
cypher
// 1. PROFILE로 쿼리 실행 계획 확인
PROFILE
MATCH (d:Document)-[:COVERS]->(t:Technology {name: "Neo4j"})
RETURN d.title, t.name;
 
// 2. 인덱스 활용 확인
CALL db.indexes() YIELD name, type, labelsOrTypes, properties, state
RETURN name, type, labelsOrTypes, properties, state;
 
// 3. 느린 쿼리를 위한 인덱스 추가
CREATE INDEX idx_technology_name FOR (t:Technology) ON (t.name);
 
// 4. 카디널리티가 높은 레이블부터 매칭 (힌트 사용)
MATCH (t:Technology {name: "Neo4j"})
USING INDEX t:Technology(name)
MATCH (d:Document)-[:COVERS]->(t)
RETURN d.title;
Tip

Neo4j에서 가장 흔한 성능 문제는 인덱스 미사용입니다. WHERE 절에서 자주 필터링하는 속성에는 반드시 인덱스를 생성하세요. PROFILE 명령으로 실행 계획을 확인하면, NodeByLabelScan 대신 NodeIndexSeek가 사용되는지 확인할 수 있습니다.


모니터링

Neo4j 메트릭 수집

monitoring.py
python
from dataclasses import dataclass
from datetime import datetime
 
@dataclass
class GraphMetrics:
    timestamp: datetime
    node_count: int
    relationship_count: int
    store_size_mb: float
    query_avg_ms: float
    cache_hit_ratio: float
 
class GraphMonitor:
    """그래프 상태를 모니터링합니다."""
 
    def __init__(self, driver):
        self.driver = driver
 
    def collect_metrics(self) -> GraphMetrics:
        """현재 그래프 메트릭을 수집합니다."""
        # 노드/관계 수
        records, _, _ = self.driver.execute_query("""
            CALL apoc.meta.stats()
            YIELD nodeCount, relCount
            RETURN nodeCount, relCount
        """)
        stats = records[0]
 
        # 스토어 사이즈
        records, _, _ = self.driver.execute_query("""
            CALL dbms.queryJmx("org.neo4j:instance=kernel#0,name=Store sizes")
            YIELD attributes
            RETURN attributes.TotalStoreSize.value AS storeSize
        """)
 
        return GraphMetrics(
            timestamp=datetime.now(),
            node_count=stats["nodeCount"],
            relationship_count=stats["relCount"],
            store_size_mb=records[0]["storeSize"] / (1024 * 1024) if records else 0,
            query_avg_ms=0,  # 실제로는 쿼리 로그에서 수집
            cache_hit_ratio=0  # JMX에서 수집
        )
 
    def check_health(self) -> dict:
        """그래프 건강 상태를 확인합니다."""
        checks = {}
 
        # 연결 확인
        try:
            self.driver.verify_connectivity()
            checks["connectivity"] = "healthy"
        except Exception:
            checks["connectivity"] = "unhealthy"
 
        # 고립 노드 비율
        records, _, _ = self.driver.execute_query("""
            MATCH (n) WHERE NOT (n)--() WITH count(n) AS orphans
            MATCH (m) WITH orphans, count(m) AS total
            RETURN toFloat(orphans) / total AS orphanRatio
        """)
        orphan_ratio = records[0]["orphanRatio"] if records else 0
        checks["orphan_ratio"] = "healthy" if orphan_ratio < 0.1 else "warning"
 
        return checks

백업과 복구

Neo4j 백업 전략

terminal
bash
# 온라인 백업 (Enterprise)
neo4j-admin database backup neo4j --to-path=/backup/$(date +%Y%m%d)
 
# 덤프 (Community/Enterprise)
neo4j-admin database dump neo4j --to-path=/backup/neo4j-dump-$(date +%Y%m%d).dump

프로그래밍 방식 백업

backup_strategy.py
python
import subprocess
from datetime import datetime
from pathlib import Path
 
class BackupManager:
    """Neo4j 백업을 관리합니다."""
 
    def __init__(self, backup_dir: str, retention_days: int = 30):
        self.backup_dir = Path(backup_dir)
        self.backup_dir.mkdir(parents=True, exist_ok=True)
        self.retention_days = retention_days
 
    def create_backup(self, database: str = "neo4j") -> Path:
        """데이터베이스 백업을 생성합니다."""
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        backup_path = self.backup_dir / f"{database}_{timestamp}"
 
        result = subprocess.run(
            ["neo4j-admin", "database", "backup", database,
             "--to-path", str(backup_path)],
            capture_output=True, text=True
        )
 
        if result.returncode != 0:
            raise RuntimeError(f"백업 실패: {result.stderr}")
 
        print(f"백업 완료: {backup_path}")
        return backup_path
 
    def cleanup_old_backups(self) -> int:
        """보관 기간이 지난 백업을 삭제합니다."""
        import shutil
        cutoff = datetime.now().timestamp() - (self.retention_days * 86400)
        removed = 0
 
        for path in self.backup_dir.iterdir():
            if path.stat().st_mtime < cutoff:
                shutil.rmtree(path)
                removed += 1
 
        return removed

Graphiti: 실시간 KG 업데이트

Graphiti는 Zep에서 개발한 오픈소스 라이브러리로, AI 에이전트를 위한 실시간 지식 그래프 업데이트를 지원합니다.

Graphiti의 핵심 개념

기존의 배치 처리 방식(문서 수집 -> 추출 -> 적재)과 달리, Graphiti는 대화나 이벤트가 발생할 때 즉시 지식 그래프를 업데이트합니다.

주요 특징

  • 실시간 처리: 새로운 정보가 들어올 때 즉시 그래프를 업데이트합니다
  • 시간 인식: 각 엔티티와 관계에 시간 메타데이터를 부착하여 "언제" 알게 된 정보인지 추적합니다
  • 자동 엔티티 해소: 새로운 엔티티가 기존 엔티티와 동일한지 자동으로 판단합니다
  • 에이전트 통합: AI 에이전트의 장기 메모리로 활용할 수 있습니다
graphiti_example.py
python
from graphiti_core import Graphiti
from graphiti_core.nodes import EpisodeType
 
# Graphiti 초기화
graphiti = Graphiti(
    neo4j_uri="bolt://localhost:7687",
    neo4j_user="neo4j",
    neo4j_password="password"
)
 
# 그래프 인덱스 초기화
await graphiti.build_indices_and_constraints()
 
# 에피소드(이벤트) 추가 - 자동으로 엔티티/관계 추출 및 적재
await graphiti.add_episode(
    name="기술 토론",
    episode_body="김개발은 GraphRAG 프로젝트에서 Neo4j를 사용하고 있으며, "
                 "최근에 벡터 인덱스와 그래프 순회를 결합한 하이브리드 검색을 구현했다.",
    source=EpisodeType.text,
    source_description="팀 회의 기록"
)
 
# 관련 정보 검색
results = await graphiti.search("김개발이 사용하는 기술은?")
for result in results:
    print(f"  {result.fact}")
Info

Graphiti는 특히 AI 에이전트의 장기 메모리 구현에 적합합니다. 에이전트가 사용자와 대화하면서 축적된 정보를 지식 그래프에 저장하고, 이후 대화에서 이 정보를 활용하여 개인화된 응답을 생성할 수 있습니다.


비용 최적화

LLM API 비용 관리

지식 그래프 구축에서 가장 큰 비용 항목은 LLM API 호출입니다.

최적화 전략설명절감 효과
캐싱동일 텍스트의 추출 결과 캐싱30~50%
모델 선택추출에는 소형 모델, 추론에는 대형 모델40~60%
배치 처리여러 청크를 하나의 요청으로20~30%
증분 업데이트변경분만 재처리60~90%
cost_optimization.py
python
import hashlib
import json
from functools import lru_cache
 
class CachedExtractor:
    """추출 결과를 캐싱하여 API 비용을 절감합니다."""
 
    def __init__(self, extractor, cache_dir: str = ".extraction_cache"):
        self.extractor = extractor
        self.cache_dir = Path(cache_dir)
        self.cache_dir.mkdir(exist_ok=True)
 
    def extract(self, text: str) -> dict:
        """캐시된 결과가 있으면 반환, 없으면 추출 후 캐싱합니다."""
        cache_key = hashlib.sha256(text.encode()).hexdigest()
        cache_file = self.cache_dir / f"{cache_key}.json"
 
        if cache_file.exists():
            return json.loads(cache_file.read_text())
 
        result = self.extractor.extract_entities(text)
        cache_file.write_text(json.dumps(result, ensure_ascii=False))
        return result

정리

이번 장에서는 지식 그래프를 프로덕션 환경에서 운영하는 핵심 전략을 다루었습니다.

  • 증분 업데이트는 해시 기반 변경 감지로 변경된 문서만 재처리합니다
  • 데이터 품질 검증은 고립 노드, 중복 엔티티, 누락 속성, 잘못된 관계를 자동 탐지합니다
  • 스케일링은 인덱스 최적화 -> 수직 확장 -> 클러스터링 순으로 접근합니다
  • Graphiti는 실시간 이벤트 기반의 KG 업데이트를 제공하며, AI 에이전트의 장기 메모리에 적합합니다
  • 비용 최적화는 캐싱, 모델 선택, 증분 처리의 조합으로 60% 이상 절감이 가능합니다

다음 장 미리보기: 10장에서는 이 시리즈에서 배운 모든 기술을 통합하여 실전 프로젝트를 구축합니다. 기술 문서에서 지식 그래프를 추출하고, GraphRAG로 자연어 질의를 처리하며, 벡터 전용 RAG와 성능을 비교합니다.

이 글이 도움이 되셨나요?

관련 주제 더 보기

#knowledge-graph#ai#data-engineering

관련 글

AI / ML

10장: 실전 프로젝트 — Knowledge Graph + AI 시스템

기술 문서에서 LLM으로 지식 그래프를 구축하고, GraphRAG로 자연어 질의를 처리하며, 벡터 전용 RAG와 성능을 비교하는 엔드투엔드 실전 프로젝트를 구현합니다.

2026년 4월 2일·23분
AI / ML

8장: 지식 그래프 쿼리와 추론

Cypher 고급 쿼리 패턴, PageRank/커뮤니티 감지/중심성 등 그래프 알고리즘의 실전 활용, LLM과 그래프 추론의 결합, Text2Cypher 자연어 변환까지 다룹니다.

2026년 3월 29일·16분
AI / ML

7장: 지식 그래프 임베딩

TransE, DistMult, ComplEx 등 관계 예측 모델과 Node2Vec, GraphSAGE 등 노드 임베딩 기법, PyTorch Geometric을 활용한 구현까지 지식 그래프 임베딩의 핵심을 다룹니다.

2026년 3월 27일·17분
이전 글8장: 지식 그래프 쿼리와 추론
다음 글10장: 실전 프로젝트 — Knowledge Graph + AI 시스템

댓글

목차

약 20분 남음
  • 학습 목표
  • 프로덕션 운영의 과제
  • 증분 업데이트
    • 왜 증분 업데이트가 필요한가
    • 변경 감지 메커니즘
    • 증분 업데이트 파이프라인
  • 데이터 품질 검증
    • 자동 검증 규칙
    • 품질 메트릭 대시보드
  • 스케일링 전략
    • 수직적 스케일링
    • 수평적 스케일링 (Neo4j Cluster)
    • 쿼리 최적화 체크리스트
  • 모니터링
    • Neo4j 메트릭 수집
  • 백업과 복구
    • Neo4j 백업 전략
    • 프로그래밍 방식 백업
  • Graphiti: 실시간 KG 업데이트
    • Graphiti의 핵심 개념
    • 주요 특징
  • 비용 최적화
    • LLM API 비용 관리
  • 정리