Elasticsearch, Cross-encoder 리랭킹, 개인화를 통합한 AI 검색 시스템의 전체 아키텍처 설계부터 구현, 벤치마킹, 운영 체크리스트까지 다룹니다.
이번 장에서는 기술 블로그 문서를 대상으로 하는 AI 검색 시스템을 구축합니다. 시리즈에서 다룬 하이브리드 검색, 리랭킹, 개인화, 피드백 루프를 모두 통합한 실전 시스템입니다.
| 컴포넌트 | 기술 |
|---|---|
| 검색 API | FastAPI (Python 3.12) |
| 검색 엔진 | Elasticsearch 8.15 |
| 임베딩 모델 | multilingual-e5-large |
| 리랭킹 모델 | cross-encoder/ms-marco-MiniLM-L-12-v2 |
| 캐시 | Redis |
| 피드백 저장 | PostgreSQL |
| 컨테이너 | Docker Compose |
ai-search-project/
docker-compose.yml
api/
main.py
config.py
models/
document.py
search.py
feedback.py
services/
indexing.py
search.py
reranking.py
personalization.py
feedback.py
routers/
search.py
feedback.py
admin.py
scripts/
index_documents.py
benchmark.py
from elasticsearch import Elasticsearch, helpers
from sentence_transformers import SentenceTransformer
from langchain.text_splitter import RecursiveCharacterTextSplitter
class IndexingService:
"""문서 인덱싱 서비스"""
INDEX_NAME = "tech-articles"
INDEX_SETTINGS = {
"settings": {
"index": {
"number_of_shards": 1,
"number_of_replicas": 0,
"knn": True,
},
"analysis": {
"analyzer": {
"korean": {
"type": "custom",
"tokenizer": "nori_tokenizer",
"filter": ["nori_readingform", "lowercase"],
}
}
},
},
"mappings": {
"properties": {
"doc_id": {"type": "keyword"},
"title": {"type": "text", "analyzer": "korean"},
"content": {"type": "text", "analyzer": "korean"},
"chunk_text": {"type": "text", "analyzer": "korean"},
"chunk_index": {"type": "integer"},
"content_vector": {
"type": "dense_vector",
"dims": 1024,
"index": True,
"similarity": "cosine",
},
"category": {"type": "keyword"},
"tags": {"type": "keyword"},
"published_date": {"type": "date"},
"url": {"type": "keyword"},
}
},
}
def __init__(self, es_url: str = "http://localhost:9200"):
self.es = Elasticsearch(es_url)
self.embed_model = SentenceTransformer("intfloat/multilingual-e5-large")
self.splitter = RecursiveCharacterTextSplitter(
chunk_size=500,
chunk_overlap=100,
separators=["\n\n", "\n", ". ", " "],
)
def create_index(self):
"""인덱스 생성 (기존 인덱스 삭제 후)"""
if self.es.indices.exists(index=self.INDEX_NAME):
self.es.indices.delete(index=self.INDEX_NAME)
self.es.indices.create(index=self.INDEX_NAME, body=self.INDEX_SETTINGS)
def index_document(self, doc: dict):
"""단일 문서 인덱싱 (청킹 + 임베딩)"""
chunks = self.splitter.split_text(doc["content"])
actions = []
for i, chunk in enumerate(chunks):
embedding = self.embed_model.encode(
f"passage: {doc['title']} {chunk}",
normalize_embeddings=True,
)
actions.append({
"_index": self.INDEX_NAME,
"_source": {
"doc_id": doc["id"],
"title": doc["title"],
"content": doc["content"],
"chunk_text": chunk,
"chunk_index": i,
"content_vector": embedding.tolist(),
"category": doc.get("category", ""),
"tags": doc.get("tags", []),
"published_date": doc.get("published_date"),
"url": doc.get("url", ""),
},
})
helpers.bulk(self.es, actions)
def bulk_index(self, documents: list[dict], batch_size: int = 50):
"""대량 문서 인덱싱"""
for i in range(0, len(documents), batch_size):
batch = documents[i:i + batch_size]
for doc in batch:
self.index_document(doc)
print(f"Indexed {min(i + batch_size, len(documents))}/{len(documents)}")from elasticsearch import Elasticsearch
from sentence_transformers import SentenceTransformer
import numpy as np
class SearchService:
"""하이브리드 검색 서비스"""
INDEX_NAME = "tech-articles"
def __init__(self, es_url: str = "http://localhost:9200"):
self.es = Elasticsearch(es_url)
self.embed_model = SentenceTransformer("intfloat/multilingual-e5-large")
def hybrid_search(
self,
query: str,
k: int = 20,
category: str = None,
date_from: str = None,
) -> list[dict]:
"""BM25 + kNN 하이브리드 검색 (RRF 퓨전)"""
query_vector = self.embed_model.encode(
f"query: {query}", normalize_embeddings=True
)
# 필터 조건 구성
filter_clauses = []
if category:
filter_clauses.append({"term": {"category": category}})
if date_from:
filter_clauses.append({"range": {"published_date": {"gte": date_from}}})
es_filter = {"bool": {"must": filter_clauses}} if filter_clauses else None
# BM25 검색
bm25_query = {
"bool": {
"must": [
{
"multi_match": {
"query": query,
"fields": ["title^2", "chunk_text", "content"],
"analyzer": "korean",
}
}
],
}
}
if filter_clauses:
bm25_query["bool"]["filter"] = filter_clauses
# kNN 검색
knn_params = {
"field": "content_vector",
"query_vector": query_vector.tolist(),
"k": k * 5,
"num_candidates": k * 20,
}
if es_filter:
knn_params["filter"] = es_filter
# RRF 하이브리드 검색
response = self.es.search(
index=self.INDEX_NAME,
body={
"retriever": {
"rrf": {
"retrievers": [
{"standard": {"query": bm25_query}},
{"knn": knn_params},
],
"rank_window_size": k * 10,
"rank_constant": 60,
}
},
"size": k,
"_source": [
"doc_id", "title", "chunk_text", "category",
"tags", "published_date", "url",
],
},
)
results = []
seen_docs = set()
for hit in response["hits"]["hits"]:
doc_id = hit["_source"]["doc_id"]
# 같은 문서의 여러 청크 중복 제거
if doc_id in seen_docs:
continue
seen_docs.add(doc_id)
results.append({
"doc_id": doc_id,
"title": hit["_source"]["title"],
"snippet": hit["_source"]["chunk_text"][:300],
"category": hit["_source"]["category"],
"tags": hit["_source"]["tags"],
"published_date": hit["_source"]["published_date"],
"url": hit["_source"]["url"],
"score": hit["_score"],
})
return resultsfrom sentence_transformers import CrossEncoder
class RerankingService:
"""Cross-encoder 리랭킹 서비스"""
def __init__(self, model_name: str = "cross-encoder/ms-marco-MiniLM-L-12-v2"):
self.model = CrossEncoder(model_name)
def rerank(
self,
query: str,
results: list[dict],
top_k: int = 10,
) -> list[dict]:
"""검색 결과 리랭킹"""
if not results:
return results
pairs = [
[query, f"{r['title']} {r['snippet']}"]
for r in results
]
scores = self.model.predict(pairs, show_progress_bar=False)
for result, score in zip(results, scores):
result["rerank_score"] = float(score)
results.sort(key=lambda x: x["rerank_score"], reverse=True)
return results[:top_k]import json
import numpy as np
import redis
class PersonalizationService:
"""검색 결과 개인화 서비스"""
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis = redis.from_url(redis_url)
self.personal_weight = 0.15
def get_user_vector(self, user_id: str) -> np.ndarray | None:
"""Redis에서 사용자 벡터 조회"""
cached = self.redis.get(f"user_vector:{user_id}")
if cached:
return np.array(json.loads(cached))
return None
def update_user_vector(
self,
user_id: str,
doc_embedding: list[float],
decay: float = 0.9,
):
"""클릭 시 사용자 벡터 업데이트"""
current = self.get_user_vector(user_id)
new_vec = np.array(doc_embedding)
if current is not None:
updated = decay * current + (1 - decay) * new_vec
else:
updated = new_vec
# 정규화
norm = np.linalg.norm(updated)
if norm > 0:
updated = updated / norm
self.redis.setex(
f"user_vector:{user_id}",
86400 * 30, # 30일 TTL
json.dumps(updated.tolist()),
)
def personalize(
self,
results: list[dict],
user_id: str | None,
) -> list[dict]:
"""검색 결과 개인화"""
if not user_id:
return results
user_vector = self.get_user_vector(user_id)
if user_vector is None:
return results
for result in results:
if "embedding" in result:
doc_vec = np.array(result["embedding"])
personal_score = float(np.dot(user_vector, doc_vec))
base_score = result.get("rerank_score", result.get("score", 0))
result["final_score"] = (
(1 - self.personal_weight) * base_score
+ self.personal_weight * personal_score
)
else:
result["final_score"] = result.get("rerank_score", result.get("score", 0))
results.sort(key=lambda x: x["final_score"], reverse=True)
return resultsfrom fastapi import APIRouter, Query, Header
from pydantic import BaseModel
router = APIRouter(prefix="/api/search", tags=["search"])
class SearchRequest(BaseModel):
query: str
category: str | None = None
date_from: str | None = None
page: int = 1
size: int = 10
class SearchResult(BaseModel):
doc_id: str
title: str
snippet: str
category: str
tags: list[str]
published_date: str | None
url: str
score: float
class SearchResponse(BaseModel):
query: str
total: int
results: list[SearchResult]
latency_ms: float
@router.post("/", response_model=SearchResponse)
async def search(
request: SearchRequest,
x_user_id: str | None = Header(None),
):
"""통합 검색 API"""
import time
start = time.perf_counter()
# 1. 하이브리드 검색 (BM25 + 시맨틱)
candidates = search_service.hybrid_search(
query=request.query,
k=100,
category=request.category,
date_from=request.date_from,
)
# 2. Cross-encoder 리랭킹
reranked = reranking_service.rerank(
query=request.query,
results=candidates,
top_k=30,
)
# 3. 개인화
personalized = personalization_service.personalize(
results=reranked,
user_id=x_user_id,
)
# 4. 페이지네이션
start_idx = (request.page - 1) * request.size
end_idx = start_idx + request.size
page_results = personalized[start_idx:end_idx]
elapsed = (time.perf_counter() - start) * 1000
return SearchResponse(
query=request.query,
total=len(personalized),
results=[
SearchResult(
doc_id=r["doc_id"],
title=r["title"],
snippet=r["snippet"],
category=r["category"],
tags=r["tags"],
published_date=r.get("published_date"),
url=r["url"],
score=r.get("final_score", r.get("rerank_score", r.get("score", 0))),
)
for r in page_results
],
latency_ms=round(elapsed, 1),
)from fastapi import APIRouter, Header
from pydantic import BaseModel
from datetime import datetime
router = APIRouter(prefix="/api/feedback", tags=["feedback"])
class ClickFeedback(BaseModel):
query_id: str
query_text: str
doc_id: str
position: int
dwell_time_ms: int | None = None
@router.post("/click")
async def record_click(
feedback: ClickFeedback,
x_user_id: str | None = Header(None),
):
"""클릭 피드백 기록"""
feedback_service.record_click(
query_id=feedback.query_id,
query_text=feedback.query_text,
doc_id=feedback.doc_id,
position=feedback.position,
dwell_time_ms=feedback.dwell_time_ms,
user_id=x_user_id,
timestamp=datetime.now(),
)
# 사용자 벡터 업데이트 (비동기)
if x_user_id:
doc_embedding = get_document_embedding(feedback.doc_id)
if doc_embedding:
personalization_service.update_user_vector(x_user_id, doc_embedding)
return {"status": "recorded"}import time
import statistics
import json
class SearchBenchmark:
"""검색 시스템 성능 벤치마크"""
def __init__(self, search_url: str):
self.search_url = search_url
def run(
self,
queries: list[str],
iterations: int = 3,
) -> dict:
"""벤치마크 실행"""
import requests
all_latencies = []
for query in queries:
query_latencies = []
for _ in range(iterations):
start = time.perf_counter()
response = requests.post(
f"{self.search_url}/api/search/",
json={"query": query, "size": 10},
)
elapsed = (time.perf_counter() - start) * 1000
query_latencies.append(elapsed)
all_latencies.extend(query_latencies)
all_latencies.sort()
return {
"total_queries": len(queries) * iterations,
"p50_ms": round(statistics.median(all_latencies), 1),
"p95_ms": round(all_latencies[int(len(all_latencies) * 0.95)], 1),
"p99_ms": round(all_latencies[int(len(all_latencies) * 0.99)], 1),
"mean_ms": round(statistics.mean(all_latencies), 1),
"min_ms": round(min(all_latencies), 1),
"max_ms": round(max(all_latencies), 1),
}
# 사용 예시
benchmark = SearchBenchmark("http://localhost:8000")
test_queries = [
"파이썬 비동기 프로그래밍",
"Elasticsearch 검색 최적화",
"React 상태 관리 패턴",
"Docker 컨테이너 네트워킹",
"PostgreSQL 인덱스 튜닝",
]
results = benchmark.run(test_queries, iterations=5)
print(json.dumps(results, indent=2))| 단계 | 목표 지연 시간 | 측정 방법 |
|---|---|---|
| BM25 + kNN (RRF) | 50ms 이하 | Elasticsearch 내부 처리 |
| Cross-encoder 리랭킹 | 100-150ms | 30개 문서 기준 |
| 개인화 | 5ms 이하 | Redis 조회 + 점수 계산 |
| 전체 E2E | 200ms 이하 | API 응답 시간 |
벤치마크 시에는 워밍업 실행을 포함해야 합니다. 첫 번째 요청은 모델 로딩, 연결 설정 등으로 지연이 크므로, 실제 벤치마크 전에 10-20회의 워밍업 쿼리를 실행하세요.
프로덕션 배포 전에 확인해야 할 항목들을 정리합니다.
프로덕션 배포에서 가장 흔한 실수는 리랭킹 서비스의 장애가 전체 검색을 멈추게 하는 것입니다. 리랭킹은 품질 향상을 위한 부가 기능이므로, 장애 시에는 리랭킹을 건너뛰고 하이브리드 검색 결과를 그대로 반환하는 폴백 로직을 반드시 구현하세요.
시스템이 안정적으로 운영되면 다음 단계의 개선을 고려할 수 있습니다.
이 시리즈를 통해 AI 기반 검색 시스템의 이론과 실전을 모두 다루었습니다.
기초 (1-3장): 검색의 진화, 시맨틱 검색 아키텍처, 품질 평가 메트릭을 통해 AI 검색의 기본 개념과 측정 방법을 학습했습니다.
핵심 기술 (4-5장): 쿼리 이해/확장과 랭킹 모델(Bi-encoder, Cross-encoder)을 통해 검색 품질을 높이는 핵심 기술을 깊이 있게 다루었습니다.
실전 엔진 (6-7장): Elasticsearch, OpenSearch, Algolia, Meilisearch 등 실전 검색 엔진의 AI 기능을 비교하고 구현했습니다.
고급 주제 (8-10장): 하이브리드 검색 파이프라인 설계, 개인화, 피드백 루프를 통해 프로덕션 수준의 검색 시스템 운영에 필요한 지식을 갖추었습니다.
통합 프로젝트 (11장): 모든 기술을 통합하여 실전 AI 검색 시스템을 구축하고, 벤치마킹과 운영 체크리스트까지 준비했습니다.
AI 검색 기술은 빠르게 발전하고 있습니다. 이 시리즈가 검색 시스템 구축의 견고한 기반이 되어, 새로운 기술과 방법론을 지속적으로 학습하고 적용하는 데 도움이 되기를 바랍니다.
이 글이 도움이 되셨나요?
클릭 신호 수집, 암묵적/명시적 피드백, 온라인 학습, A/B 테스트 자동화, 검색 품질 모니터링, 차가운 시작 문제를 다룹니다.
사용자 프로파일링, 클릭 이력 기반 개인화, 임베딩 기반 사용자 벡터, 인기도 편향 문제, 프라이버시 고려사항과 실시간 개인화를 다룹니다.
BM25와 시맨틱 검색의 결합 전략, RRF/선형 보간, 리랭킹 캐스케이드, 다단계 검색 파이프라인 설계와 성능-품질 트레이드오프를 다룹니다.