BM25와 시맨틱 검색의 결합 전략, RRF/선형 보간, 리랭킹 캐스케이드, 다단계 검색 파이프라인 설계와 성능-품질 트레이드오프를 다룹니다.
단일 검색 방식만으로는 모든 유형의 쿼리를 잘 처리하기 어렵습니다. BM25는 정확한 키워드 매칭에 강하고, 시맨틱 검색은 의미적 유사성 파악에 강합니다. 하이브리드 검색은 이 두 방식을 결합하여 상호 보완적인 효과를 얻습니다.
| 쿼리 유형 | BM25 | 시맨틱 검색 | 예시 |
|---|---|---|---|
| 정확한 용어 매칭 | 강함 | 보통 | "NullPointerException", "ERR_CONNECTION_REFUSED" |
| 코드/API 이름 | 강함 | 약함 | "useState hook", "pandas.DataFrame.merge" |
| 개념적 질문 | 약함 | 강함 | "웹 서버 부하를 줄이는 방법" |
| 동의어/유사어 | 약함 | 강함 | "기계학습" 검색으로 "머신러닝" 문서 찾기 |
| 긴 자연어 질문 | 보통 | 강함 | "대규모 트래픽을 처리하는 마이크로서비스 아키텍처" |
실전에서 관찰하면, 전체 검색 쿼리 중 약 30-40%는 BM25가, 30-40%는 시맨틱 검색이 더 나은 결과를 제공하고, 나머지 20-30%는 비슷한 성능을 보입니다. 하이브리드 검색은 이 모든 경우를 아우르는 전략입니다.
하이브리드 검색의 핵심은 서로 다른 검색 방식의 결과를 어떻게 병합하느냐입니다.
5장에서 다룬 RRF는 순위 기반 퓨전입니다. 점수의 절대값이 아닌 순위만 사용하므로, 서로 다른 스케일의 점수를 정규화 없이 결합할 수 있습니다.
def rrf_fusion(
results_list: list[list[tuple[str, float]]],
k: int = 60,
top_n: int = 10,
) -> list[tuple[str, float]]:
"""
RRF 점수 퓨전
results_list: 여러 검색 결과 [(doc_id, score), ...]의 리스트
"""
rrf_scores: dict[str, float] = {}
for results in results_list:
for rank, (doc_id, _) in enumerate(results, start=1):
rrf_scores[doc_id] = rrf_scores.get(doc_id, 0.0) + 1.0 / (k + rank)
sorted_results = sorted(rrf_scores.items(), key=lambda x: x[1], reverse=True)
return sorted_results[:top_n]점수를 정규화한 뒤 가중 평균을 구하는 방식입니다. 각 검색 방식의 기여도를 가중치로 직접 제어할 수 있습니다.
def min_max_normalize(scores: list[float]) -> list[float]:
"""Min-Max 정규화: 0-1 범위로 변환"""
min_s = min(scores) if scores else 0
max_s = max(scores) if scores else 1
if max_s == min_s:
return [0.5] * len(scores)
return [(s - min_s) / (max_s - min_s) for s in scores]
def linear_interpolation_fusion(
bm25_results: list[tuple[str, float]],
semantic_results: list[tuple[str, float]],
alpha: float = 0.5,
top_n: int = 10,
) -> list[tuple[str, float]]:
"""
선형 보간 퓨전
alpha: 시맨틱 검색 가중치 (1-alpha가 BM25 가중치)
"""
# 점수 정규화
bm25_ids = [doc_id for doc_id, _ in bm25_results]
bm25_scores = min_max_normalize([s for _, s in bm25_results])
bm25_map = dict(zip(bm25_ids, bm25_scores))
sem_ids = [doc_id for doc_id, _ in semantic_results]
sem_scores = min_max_normalize([s for _, s in semantic_results])
sem_map = dict(zip(sem_ids, sem_scores))
# 모든 문서에 대해 결합 점수 계산
all_docs = set(bm25_ids) | set(sem_ids)
combined = {}
for doc_id in all_docs:
bm25_s = bm25_map.get(doc_id, 0.0)
sem_s = sem_map.get(doc_id, 0.0)
combined[doc_id] = (1 - alpha) * bm25_s + alpha * sem_s
sorted_results = sorted(combined.items(), key=lambda x: x[1], reverse=True)
return sorted_results[:top_n]| 전략 | 장점 | 단점 | 적합한 경우 |
|---|---|---|---|
| RRF | 정규화 불필요, 안정적 | 점수 차이 미반영 | 초기 구현, 범용 |
| 선형 보간 | 가중치로 세밀 조절 | 정규화 품질에 의존 | 도메인 특화 튜닝 |
| 학습된 퓨전 | 최적의 결합 학습 | 학습 데이터 필요 | 대규모 시스템 |
캐스케이드(Cascade) 구조는 여러 단계의 랭킹 모델을 순차적으로 적용하는 패턴입니다. 각 단계는 이전 단계보다 더 정확하지만 더 느린 모델을 사용하며, 처리할 문서 수를 점진적으로 줄여나갑니다.
from sentence_transformers import CrossEncoder
from dataclasses import dataclass
@dataclass
class CascadeConfig:
stage_name: str
model_name: str
input_size: int
output_size: int
class RankingCascade:
"""다단계 리랭킹 캐스케이드"""
def __init__(self, stages: list[CascadeConfig]):
self.stages = stages
self.models = {}
for stage in stages:
self.models[stage.stage_name] = CrossEncoder(stage.model_name)
def rerank(self, query: str, candidates: list[dict]) -> list[dict]:
"""캐스케이드 리랭킹 실행"""
current_candidates = candidates
for stage in self.stages:
model = self.models[stage.stage_name]
top_candidates = current_candidates[:stage.input_size]
pairs = [[query, c["text"]] for c in top_candidates]
scores = model.predict(pairs)
for candidate, score in zip(top_candidates, scores):
candidate[f"{stage.stage_name}_score"] = float(score)
top_candidates.sort(
key=lambda x: x[f"{stage.stage_name}_score"],
reverse=True,
)
current_candidates = top_candidates[:stage.output_size]
return current_candidates
# 사용 예시
cascade = RankingCascade([
CascadeConfig(
stage_name="light_rerank",
model_name="cross-encoder/ms-marco-MiniLM-L-6-v2",
input_size=200,
output_size=50,
),
CascadeConfig(
stage_name="heavy_rerank",
model_name="cross-encoder/ms-marco-MiniLM-L-12-v2",
input_size=50,
output_size=10,
),
])캐스케이드의 각 단계에서 처리하는 문서 수의 비율은 보통 5:1 ~ 10:1 정도가 적절합니다. 예를 들어 1단계에서 1000개를 추출했다면 2단계에서는 100-200개, 3단계에서는 20-50개로 줄여나갑니다.
검색 시스템에서 품질(관련성)과 성능(지연 시간)은 트레이드오프 관계에 있습니다. 리랭킹 단계를 추가할수록 품질은 향상되지만 지연 시간이 증가합니다.
import time
from contextlib import contextmanager
from dataclasses import dataclass, field
@dataclass
class LatencyProfile:
stages: dict[str, float] = field(default_factory=dict)
@contextmanager
def measure(self, stage_name: str):
start = time.perf_counter()
yield
elapsed = time.perf_counter() - start
self.stages[stage_name] = elapsed * 1000 # ms
def total_ms(self) -> float:
return sum(self.stages.values())
def report(self) -> str:
lines = [f"Total: {self.total_ms():.1f}ms"]
for stage, ms in self.stages.items():
pct = ms / self.total_ms() * 100
lines.append(f" {stage}: {ms:.1f}ms ({pct:.0f}%)")
return "\n".join(lines)
# 사용 예시
profile = LatencyProfile()
with profile.measure("bm25_retrieval"):
bm25_results = bm25_search(query, k=500)
with profile.measure("knn_retrieval"):
knn_results = knn_search(query, k=500)
with profile.measure("rrf_fusion"):
fused = rrf_fusion([bm25_results, knn_results], top_n=100)
with profile.measure("cross_encoder_rerank"):
reranked = cross_encoder_rerank(query, fused, top_n=10)
print(profile.report())
# Total: 285.3ms
# bm25_retrieval: 12.5ms (4%)
# knn_retrieval: 18.2ms (6%)
# rrf_fusion: 1.6ms (1%)
# cross_encoder_rerank: 253.0ms (89%)리랭킹 단계의 지연 시간은 보통 전체 파이프라인의 80-90%를 차지합니다. 다음 전략으로 이를 최적화할 수 있습니다.
1. 배치 처리와 GPU 활용
import torch
def batch_rerank(model, query: str, documents: list[str], batch_size: int = 32):
"""배치 단위 Cross-encoder 리랭킹"""
all_scores = []
pairs = [[query, doc] for doc in documents]
for i in range(0, len(pairs), batch_size):
batch = pairs[i:i + batch_size]
with torch.no_grad():
scores = model.predict(batch, show_progress_bar=False)
all_scores.extend(scores)
return all_scores2. 모델 경량화
| 모델 | 파라미터 수 | 지연 시간 (100문서) | NDCG@10 |
|---|---|---|---|
| MiniLM-L-6-v2 | 22M | 약 80ms | 0.82 |
| MiniLM-L-12-v2 | 33M | 약 150ms | 0.85 |
| DeBERTa-v3-base | 86M | 약 350ms | 0.89 |
3. 리랭킹 대상 수 조절
rank_window_size를 줄이면 지연 시간이 비례적으로 감소합니다. 핵심은 "충분히 좋은" 결과를 제공하는 최소한의 window size를 찾는 것입니다.
4. 캐싱
인기 쿼리의 리랭킹 결과를 캐시하면 반복 쿼리의 지연 시간을 제거할 수 있습니다.
import hashlib
from functools import lru_cache
def cache_key(query: str, doc_ids: list[str]) -> str:
content = f"{query}|{'|'.join(doc_ids)}"
return hashlib.md5(content.encode()).hexdigest()
# 간단한 인메모리 캐시 (프로덕션에서는 Redis 등 사용)
rerank_cache: dict[str, list[float]] = {}
def cached_rerank(model, query: str, documents: list[dict]) -> list[float]:
doc_ids = [d["id"] for d in documents]
key = cache_key(query, doc_ids)
if key in rerank_cache:
return rerank_cache[key]
scores = model.predict([[query, d["text"]] for d in documents])
rerank_cache[key] = scores.tolist()
return scores.tolist()캐싱은 검색 결과가 자주 변경되는 시스템에서는 주의가 필요합니다. 문서가 업데이트되거나 삭제되었을 때 캐시를 적시에 무효화해야 합니다. TTL(Time-To-Live)을 설정하여 캐시의 신선도를 관리하세요.
| 지연 시간 목표 | 파이프라인 구성 |
|---|---|
| 50ms 이하 | BM25 + kNN (RRF) |
| 200ms 이하 | BM25 + kNN (RRF) + 경량 Cross-encoder |
| 500ms 이하 | BM25 + kNN (RRF) + 2단계 캐스케이드 |
| 1초 이하 | 전체 캐스케이드 + 개인화 + 다양성 |
이번 장에서는 하이브리드 검색과 리랭킹을 결합한 다단계 파이프라인의 설계와 최적화를 다루었습니다. BM25와 시맨틱 검색의 상호 보완적 강점을 확인했고, RRF와 선형 보간의 퓨전 전략을 비교했습니다. 캐스케이드 구조로 정확도와 속도를 단계적으로 조절하는 방법, 그리고 배치 처리, 모델 경량화, 캐싱 등의 최적화 전략을 학습했습니다.
다음 장에서는 검색 결과를 사용자 개개인에게 맞춤화하는 검색 개인화를 다룹니다. 사용자 프로파일링, 클릭 이력 기반 개인화, 임베딩 기반 사용자 벡터, 프라이버시 고려사항까지 살펴보겠습니다.
이 글이 도움이 되셨나요?
사용자 프로파일링, 클릭 이력 기반 개인화, 임베딩 기반 사용자 벡터, 인기도 편향 문제, 프라이버시 고려사항과 실시간 개인화를 다룹니다.
OpenSearch 신경 검색, 재랭킹 파이프라인과 Algolia, Meilisearch, Typesense 등 주요 검색 엔진의 AI 검색 기능을 비교합니다.
클릭 신호 수집, 암묵적/명시적 피드백, 온라인 학습, A/B 테스트 자동화, 검색 품질 모니터링, 차가운 시작 문제를 다룹니다.