지식 그래프의 증분 업데이트, 데이터 품질 검증, 스케일링 전략, 모니터링, 비용 최적화, 그리고 Graphiti를 활용한 실시간 KG 업데이트까지 프로덕션 운영의 핵심을 다룹니다.
개발 환경에서 지식 그래프를 구축하는 것과 프로덕션에서 운영하는 것은 근본적으로 다른 문제입니다. 프로덕션 환경에서는 다음과 같은 과제에 직면합니다.
문서가 추가되거나 수정될 때마다 전체 지식 그래프를 재구축하는 것은 비효율적입니다. LLM API 비용, 처리 시간, 서비스 중단 리스크를 고려하면 변경된 부분만 업데이트하는 증분 방식이 필수입니다.
import hashlib
from datetime import datetime
from dataclasses import dataclass
@dataclass
class DocumentChange:
doc_id: str
change_type: str # "created", "modified", "deleted"
content_hash: str
detected_at: datetime
class ChangeDetector:
"""문서 변경을 감지하는 클래스입니다."""
def __init__(self, driver):
self.driver = driver
def compute_hash(self, content: str) -> str:
"""콘텐츠의 해시를 계산합니다."""
return hashlib.sha256(content.encode()).hexdigest()
def detect_changes(self, documents: list[dict]) -> list[DocumentChange]:
"""문서 목록의 변경 사항을 감지합니다."""
changes = []
for doc in documents:
new_hash = self.compute_hash(doc["content"])
# 기존 문서와 해시 비교
records, _, _ = self.driver.execute_query("""
MATCH (d:Document {externalId: $docId})
RETURN d.contentHash AS hash, d.title AS title
""", docId=doc["id"])
if not records:
# 새 문서
changes.append(DocumentChange(
doc_id=doc["id"],
change_type="created",
content_hash=new_hash,
detected_at=datetime.now()
))
elif records[0]["hash"] != new_hash:
# 수정된 문서
changes.append(DocumentChange(
doc_id=doc["id"],
change_type="modified",
content_hash=new_hash,
detected_at=datetime.now()
))
# 삭제된 문서 감지
current_ids = {doc["id"] for doc in documents}
records, _, _ = self.driver.execute_query("""
MATCH (d:Document)
WHERE d.externalId IS NOT NULL
RETURN d.externalId AS docId
""")
for record in records:
if record["docId"] not in current_ids:
changes.append(DocumentChange(
doc_id=record["docId"],
change_type="deleted",
content_hash="",
detected_at=datetime.now()
))
return changesclass IncrementalUpdater:
"""증분 업데이트를 수행하는 파이프라인입니다."""
def __init__(self, driver, extractor, resolver):
self.driver = driver
self.extractor = extractor
self.resolver = resolver
def process_changes(self, changes: list[DocumentChange],
documents: dict[str, dict]) -> dict:
"""변경 사항에 따라 그래프를 업데이트합니다."""
stats = {"created": 0, "modified": 0, "deleted": 0}
for change in changes:
if change.change_type == "created":
self._add_document(documents[change.doc_id])
stats["created"] += 1
elif change.change_type == "modified":
self._update_document(documents[change.doc_id], change.content_hash)
stats["modified"] += 1
elif change.change_type == "deleted":
self._remove_document(change.doc_id)
stats["deleted"] += 1
return stats
def _add_document(self, doc: dict) -> None:
"""새 문서에서 엔티티/관계를 추출하여 그래프에 추가합니다."""
# 5장의 추출 파이프라인 활용
result = self.extractor.extract_entities(doc["content"])
entities, relationships = validate_extraction(result)
resolved = self.resolver.resolve_batch(entities)
# Neo4j에 MERGE로 적재 (중복 방지)
self._load_to_graph(doc, resolved, relationships)
def _update_document(self, doc: dict, new_hash: str) -> None:
"""수정된 문서의 관련 노드/관계를 업데이트합니다."""
# 1. 해당 문서에서 파생된 관계 삭제
self.driver.execute_query("""
MATCH (d:Document {externalId: $docId})-[r:EXTRACTED_FROM]-(e)
DELETE r
""", docId=doc["id"])
# 2. 새로운 추출 결과로 재연결
self._add_document(doc)
# 3. 해시 업데이트
self.driver.execute_query("""
MATCH (d:Document {externalId: $docId})
SET d.contentHash = $hash, d.updatedAt = datetime()
""", docId=doc["id"], hash=new_hash)
def _remove_document(self, doc_id: str) -> None:
"""삭제된 문서와 관련 관계를 정리합니다."""
# 문서 노드와 EXTRACTED_FROM 관계만 삭제
# 다른 문서에서도 참조하는 엔티티는 유지
self.driver.execute_query("""
MATCH (d:Document {externalId: $docId})
OPTIONAL MATCH (d)-[r:EXTRACTED_FROM]->(e)
WITH d, r, e
DELETE r
WITH d, e
WHERE NOT EXISTS { MATCH (e)<-[:EXTRACTED_FROM]-(:Document) }
AND NOT EXISTS { MATCH (e)<-[:REFERENCES]-() }
DETACH DELETE e
WITH d
DETACH DELETE d
""", docId=doc_id)
def _load_to_graph(self, doc: dict, entities: list, relationships: list) -> None:
"""추출된 데이터를 그래프에 적재합니다."""
# MERGE를 사용하여 기존 엔티티와 안전하게 연결
for entity in entities:
self.driver.execute_query("""
MERGE (e {name: $name})
ON CREATE SET e:$type, e += $props
WITH e
MATCH (d:Document {externalId: $docId})
MERGE (d)-[:EXTRACTED_FROM]->(e)
""", name=entity["name"], type=entity["type"],
props=entity.get("properties", {}), docId=doc["id"])엔티티 삭제 시 주의해야 합니다. 여러 문서에서 참조하는 엔티티를 한 문서의 삭제로 인해 함께 삭제하면 그래프의 연결성이 손상됩니다. 위 코드에서는 다른 문서에서 더 이상 참조하지 않는 엔티티만 삭제합니다.
LLM 추출은 완벽하지 않습니다. 지속적인 품질 검증이 필요합니다.
class GraphQualityChecker:
"""그래프 데이터 품질을 검증합니다."""
def __init__(self, driver):
self.driver = driver
def check_orphan_nodes(self) -> list[dict]:
"""관계가 없는 고립된 노드를 탐지합니다."""
records, _, _ = self.driver.execute_query("""
MATCH (n)
WHERE NOT (n)--()
RETURN labels(n) AS type, n.name AS name, elementId(n) AS id
LIMIT 100
""")
return [r.data() for r in records]
def check_duplicate_entities(self) -> list[dict]:
"""유사한 이름의 중복 엔티티를 탐지합니다."""
records, _, _ = self.driver.execute_query("""
MATCH (a), (b)
WHERE a <> b
AND labels(a) = labels(b)
AND apoc.text.levenshteinSimilarity(a.name, b.name) > 0.85
RETURN a.name AS name1, b.name AS name2,
apoc.text.levenshteinSimilarity(a.name, b.name) AS similarity
LIMIT 50
""")
return [r.data() for r in records]
def check_missing_properties(self) -> list[dict]:
"""필수 속성이 누락된 노드를 탐지합니다."""
records, _, _ = self.driver.execute_query("""
MATCH (n)
WHERE n.name IS NULL OR trim(n.name) = ""
RETURN labels(n) AS type, elementId(n) AS id, properties(n) AS props
LIMIT 100
""")
return [r.data() for r in records]
def check_invalid_relationships(self) -> list[dict]:
"""스키마에 맞지 않는 관계를 탐지합니다."""
# 예: Person -DEPENDS_ON-> Technology는 허용되지 않음
records, _, _ = self.driver.execute_query("""
MATCH (p:Person)-[r:DEPENDS_ON]->(t)
RETURN p.name AS person, type(r) AS relType, t.name AS target
LIMIT 50
""")
return [r.data() for r in records]
def run_all_checks(self) -> dict:
"""모든 품질 검증을 실행합니다."""
return {
"orphan_nodes": self.check_orphan_nodes(),
"duplicate_entities": self.check_duplicate_entities(),
"missing_properties": self.check_missing_properties(),
"invalid_relationships": self.check_invalid_relationships(),
}// 그래프 통계 요약
CALL apoc.meta.stats() YIELD nodeCount, relCount, labels, relTypes
RETURN nodeCount, relCount, labels, relTypes;
// 레이블별 노드 수
MATCH (n)
RETURN labels(n) AS type, count(n) AS count
ORDER BY count DESC;
// 관계 타입별 수
MATCH ()-[r]->()
RETURN type(r) AS relType, count(r) AS count
ORDER BY count DESC;
// 평균 연결도 (노드당 평균 관계 수)
MATCH (n)
WITH n, size([(n)--() | 1]) AS degree
RETURN avg(degree) AS avgDegree,
min(degree) AS minDegree,
max(degree) AS maxDegree,
percentileCont(degree, 0.5) AS medianDegree;노드 수가 수백만 이하인 경우, Neo4j 단일 인스턴스의 리소스를 늘리는 것으로 충분합니다.
| 그래프 규모 | 권장 메모리 | 권장 CPU |
|---|---|---|
| 100K 노드 이하 | 4GB | 2 코어 |
| 1M 노드 이하 | 16GB | 4 코어 |
| 10M 노드 이하 | 64GB | 8 코어 |
| 100M+ 노드 | 클러스터 필요 | 16+ 코어 |
Neo4j Enterprise에서는 Primary-Secondary 복제를 통해 읽기 부하를 분산할 수 있습니다.
// 1. PROFILE로 쿼리 실행 계획 확인
PROFILE
MATCH (d:Document)-[:COVERS]->(t:Technology {name: "Neo4j"})
RETURN d.title, t.name;
// 2. 인덱스 활용 확인
CALL db.indexes() YIELD name, type, labelsOrTypes, properties, state
RETURN name, type, labelsOrTypes, properties, state;
// 3. 느린 쿼리를 위한 인덱스 추가
CREATE INDEX idx_technology_name FOR (t:Technology) ON (t.name);
// 4. 카디널리티가 높은 레이블부터 매칭 (힌트 사용)
MATCH (t:Technology {name: "Neo4j"})
USING INDEX t:Technology(name)
MATCH (d:Document)-[:COVERS]->(t)
RETURN d.title;Neo4j에서 가장 흔한 성능 문제는 인덱스 미사용입니다. WHERE 절에서 자주 필터링하는 속성에는 반드시 인덱스를 생성하세요. PROFILE 명령으로 실행 계획을 확인하면, NodeByLabelScan 대신 NodeIndexSeek가 사용되는지 확인할 수 있습니다.
from dataclasses import dataclass
from datetime import datetime
@dataclass
class GraphMetrics:
timestamp: datetime
node_count: int
relationship_count: int
store_size_mb: float
query_avg_ms: float
cache_hit_ratio: float
class GraphMonitor:
"""그래프 상태를 모니터링합니다."""
def __init__(self, driver):
self.driver = driver
def collect_metrics(self) -> GraphMetrics:
"""현재 그래프 메트릭을 수집합니다."""
# 노드/관계 수
records, _, _ = self.driver.execute_query("""
CALL apoc.meta.stats()
YIELD nodeCount, relCount
RETURN nodeCount, relCount
""")
stats = records[0]
# 스토어 사이즈
records, _, _ = self.driver.execute_query("""
CALL dbms.queryJmx("org.neo4j:instance=kernel#0,name=Store sizes")
YIELD attributes
RETURN attributes.TotalStoreSize.value AS storeSize
""")
return GraphMetrics(
timestamp=datetime.now(),
node_count=stats["nodeCount"],
relationship_count=stats["relCount"],
store_size_mb=records[0]["storeSize"] / (1024 * 1024) if records else 0,
query_avg_ms=0, # 실제로는 쿼리 로그에서 수집
cache_hit_ratio=0 # JMX에서 수집
)
def check_health(self) -> dict:
"""그래프 건강 상태를 확인합니다."""
checks = {}
# 연결 확인
try:
self.driver.verify_connectivity()
checks["connectivity"] = "healthy"
except Exception:
checks["connectivity"] = "unhealthy"
# 고립 노드 비율
records, _, _ = self.driver.execute_query("""
MATCH (n) WHERE NOT (n)--() WITH count(n) AS orphans
MATCH (m) WITH orphans, count(m) AS total
RETURN toFloat(orphans) / total AS orphanRatio
""")
orphan_ratio = records[0]["orphanRatio"] if records else 0
checks["orphan_ratio"] = "healthy" if orphan_ratio < 0.1 else "warning"
return checks# 온라인 백업 (Enterprise)
neo4j-admin database backup neo4j --to-path=/backup/$(date +%Y%m%d)
# 덤프 (Community/Enterprise)
neo4j-admin database dump neo4j --to-path=/backup/neo4j-dump-$(date +%Y%m%d).dumpimport subprocess
from datetime import datetime
from pathlib import Path
class BackupManager:
"""Neo4j 백업을 관리합니다."""
def __init__(self, backup_dir: str, retention_days: int = 30):
self.backup_dir = Path(backup_dir)
self.backup_dir.mkdir(parents=True, exist_ok=True)
self.retention_days = retention_days
def create_backup(self, database: str = "neo4j") -> Path:
"""데이터베이스 백업을 생성합니다."""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = self.backup_dir / f"{database}_{timestamp}"
result = subprocess.run(
["neo4j-admin", "database", "backup", database,
"--to-path", str(backup_path)],
capture_output=True, text=True
)
if result.returncode != 0:
raise RuntimeError(f"백업 실패: {result.stderr}")
print(f"백업 완료: {backup_path}")
return backup_path
def cleanup_old_backups(self) -> int:
"""보관 기간이 지난 백업을 삭제합니다."""
import shutil
cutoff = datetime.now().timestamp() - (self.retention_days * 86400)
removed = 0
for path in self.backup_dir.iterdir():
if path.stat().st_mtime < cutoff:
shutil.rmtree(path)
removed += 1
return removedGraphiti는 Zep에서 개발한 오픈소스 라이브러리로, AI 에이전트를 위한 실시간 지식 그래프 업데이트를 지원합니다.
기존의 배치 처리 방식(문서 수집 -> 추출 -> 적재)과 달리, Graphiti는 대화나 이벤트가 발생할 때 즉시 지식 그래프를 업데이트합니다.
from graphiti_core import Graphiti
from graphiti_core.nodes import EpisodeType
# Graphiti 초기화
graphiti = Graphiti(
neo4j_uri="bolt://localhost:7687",
neo4j_user="neo4j",
neo4j_password="password"
)
# 그래프 인덱스 초기화
await graphiti.build_indices_and_constraints()
# 에피소드(이벤트) 추가 - 자동으로 엔티티/관계 추출 및 적재
await graphiti.add_episode(
name="기술 토론",
episode_body="김개발은 GraphRAG 프로젝트에서 Neo4j를 사용하고 있으며, "
"최근에 벡터 인덱스와 그래프 순회를 결합한 하이브리드 검색을 구현했다.",
source=EpisodeType.text,
source_description="팀 회의 기록"
)
# 관련 정보 검색
results = await graphiti.search("김개발이 사용하는 기술은?")
for result in results:
print(f" {result.fact}")Graphiti는 특히 AI 에이전트의 장기 메모리 구현에 적합합니다. 에이전트가 사용자와 대화하면서 축적된 정보를 지식 그래프에 저장하고, 이후 대화에서 이 정보를 활용하여 개인화된 응답을 생성할 수 있습니다.
지식 그래프 구축에서 가장 큰 비용 항목은 LLM API 호출입니다.
| 최적화 전략 | 설명 | 절감 효과 |
|---|---|---|
| 캐싱 | 동일 텍스트의 추출 결과 캐싱 | 30~50% |
| 모델 선택 | 추출에는 소형 모델, 추론에는 대형 모델 | 40~60% |
| 배치 처리 | 여러 청크를 하나의 요청으로 | 20~30% |
| 증분 업데이트 | 변경분만 재처리 | 60~90% |
import hashlib
import json
from functools import lru_cache
class CachedExtractor:
"""추출 결과를 캐싱하여 API 비용을 절감합니다."""
def __init__(self, extractor, cache_dir: str = ".extraction_cache"):
self.extractor = extractor
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(exist_ok=True)
def extract(self, text: str) -> dict:
"""캐시된 결과가 있으면 반환, 없으면 추출 후 캐싱합니다."""
cache_key = hashlib.sha256(text.encode()).hexdigest()
cache_file = self.cache_dir / f"{cache_key}.json"
if cache_file.exists():
return json.loads(cache_file.read_text())
result = self.extractor.extract_entities(text)
cache_file.write_text(json.dumps(result, ensure_ascii=False))
return result이번 장에서는 지식 그래프를 프로덕션 환경에서 운영하는 핵심 전략을 다루었습니다.
다음 장 미리보기: 10장에서는 이 시리즈에서 배운 모든 기술을 통합하여 실전 프로젝트를 구축합니다. 기술 문서에서 지식 그래프를 추출하고, GraphRAG로 자연어 질의를 처리하며, 벡터 전용 RAG와 성능을 비교합니다.
이 글이 도움이 되셨나요?
관련 주제 더 보기
기술 문서에서 LLM으로 지식 그래프를 구축하고, GraphRAG로 자연어 질의를 처리하며, 벡터 전용 RAG와 성능을 비교하는 엔드투엔드 실전 프로젝트를 구현합니다.
Cypher 고급 쿼리 패턴, PageRank/커뮤니티 감지/중심성 등 그래프 알고리즘의 실전 활용, LLM과 그래프 추론의 결합, Text2Cypher 자연어 변환까지 다룹니다.
TransE, DistMult, ComplEx 등 관계 예측 모델과 Node2Vec, GraphSAGE 등 노드 임베딩 기법, PyTorch Geometric을 활용한 구현까지 지식 그래프 임베딩의 핵심을 다룹니다.