본문으로 건너뛰기
Kreath Archive
TechProjectsBooksAbout
TechProjectsBooksAbout

내비게이션

  • Tech
  • Projects
  • Books
  • About
  • Tags

카테고리

  • AI / ML
  • 웹 개발
  • 프로그래밍
  • 개발 도구

연결

  • GitHub
  • Email
  • RSS
© 2026 Kreath Archive. All rights reserved.Built with Next.js + MDX
홈TechProjectsBooksAbout
//
  1. 홈
  2. 테크
  3. 8장: 하이브리드 검색과 리랭킹 파이프라인
2026년 2월 17일·AI / ML·

8장: 하이브리드 검색과 리랭킹 파이프라인

BM25와 시맨틱 검색의 결합 전략, RRF/선형 보간, 리랭킹 캐스케이드, 다단계 검색 파이프라인 설계와 성능-품질 트레이드오프를 다룹니다.

13분843자7개 섹션
searchai
공유
ai-search8 / 11
1234567891011
이전7장: OpenSearch와 기타 검색 엔진다음9장: 검색 개인화

학습 목표

  • BM25와 시맨틱 검색의 결합이 필요한 이유를 이해합니다.
  • RRF와 선형 보간 등 점수 퓨전 전략을 비교하고 구현합니다.
  • 리랭킹 캐스케이드(다단계 랭킹)의 설계 원리를 학습합니다.
  • 성능(지연 시간)과 품질(관련성)의 트레이드오프를 최적화하는 방법을 파악합니다.

하이브리드 검색의 필요성

단일 검색 방식만으로는 모든 유형의 쿼리를 잘 처리하기 어렵습니다. BM25는 정확한 키워드 매칭에 강하고, 시맨틱 검색은 의미적 유사성 파악에 강합니다. 하이브리드 검색은 이 두 방식을 결합하여 상호 보완적인 효과를 얻습니다.

각 방식이 강한 쿼리 유형

쿼리 유형BM25시맨틱 검색예시
정확한 용어 매칭강함보통"NullPointerException", "ERR_CONNECTION_REFUSED"
코드/API 이름강함약함"useState hook", "pandas.DataFrame.merge"
개념적 질문약함강함"웹 서버 부하를 줄이는 방법"
동의어/유사어약함강함"기계학습" 검색으로 "머신러닝" 문서 찾기
긴 자연어 질문보통강함"대규모 트래픽을 처리하는 마이크로서비스 아키텍처"
Info

실전에서 관찰하면, 전체 검색 쿼리 중 약 30-40%는 BM25가, 30-40%는 시맨틱 검색이 더 나은 결과를 제공하고, 나머지 20-30%는 비슷한 성능을 보입니다. 하이브리드 검색은 이 모든 경우를 아우르는 전략입니다.


점수 퓨전 전략

하이브리드 검색의 핵심은 서로 다른 검색 방식의 결과를 어떻게 병합하느냐입니다.

RRF (Reciprocal Rank Fusion)

5장에서 다룬 RRF는 순위 기반 퓨전입니다. 점수의 절대값이 아닌 순위만 사용하므로, 서로 다른 스케일의 점수를 정규화 없이 결합할 수 있습니다.

rrf_implementation.py
python
def rrf_fusion(
    results_list: list[list[tuple[str, float]]],
    k: int = 60,
    top_n: int = 10,
) -> list[tuple[str, float]]:
    """
    RRF 점수 퓨전
    results_list: 여러 검색 결과 [(doc_id, score), ...]의 리스트
    """
    rrf_scores: dict[str, float] = {}
 
    for results in results_list:
        for rank, (doc_id, _) in enumerate(results, start=1):
            rrf_scores[doc_id] = rrf_scores.get(doc_id, 0.0) + 1.0 / (k + rank)
 
    sorted_results = sorted(rrf_scores.items(), key=lambda x: x[1], reverse=True)
    return sorted_results[:top_n]

선형 보간 (Linear Interpolation)

점수를 정규화한 뒤 가중 평균을 구하는 방식입니다. 각 검색 방식의 기여도를 가중치로 직접 제어할 수 있습니다.

linear_interpolation.py
python
def min_max_normalize(scores: list[float]) -> list[float]:
    """Min-Max 정규화: 0-1 범위로 변환"""
    min_s = min(scores) if scores else 0
    max_s = max(scores) if scores else 1
    if max_s == min_s:
        return [0.5] * len(scores)
    return [(s - min_s) / (max_s - min_s) for s in scores]
 
def linear_interpolation_fusion(
    bm25_results: list[tuple[str, float]],
    semantic_results: list[tuple[str, float]],
    alpha: float = 0.5,
    top_n: int = 10,
) -> list[tuple[str, float]]:
    """
    선형 보간 퓨전
    alpha: 시맨틱 검색 가중치 (1-alpha가 BM25 가중치)
    """
    # 점수 정규화
    bm25_ids = [doc_id for doc_id, _ in bm25_results]
    bm25_scores = min_max_normalize([s for _, s in bm25_results])
    bm25_map = dict(zip(bm25_ids, bm25_scores))
 
    sem_ids = [doc_id for doc_id, _ in semantic_results]
    sem_scores = min_max_normalize([s for _, s in semantic_results])
    sem_map = dict(zip(sem_ids, sem_scores))
 
    # 모든 문서에 대해 결합 점수 계산
    all_docs = set(bm25_ids) | set(sem_ids)
    combined = {}
    for doc_id in all_docs:
        bm25_s = bm25_map.get(doc_id, 0.0)
        sem_s = sem_map.get(doc_id, 0.0)
        combined[doc_id] = (1 - alpha) * bm25_s + alpha * sem_s
 
    sorted_results = sorted(combined.items(), key=lambda x: x[1], reverse=True)
    return sorted_results[:top_n]

퓨전 전략 비교

전략장점단점적합한 경우
RRF정규화 불필요, 안정적점수 차이 미반영초기 구현, 범용
선형 보간가중치로 세밀 조절정규화 품질에 의존도메인 특화 튜닝
학습된 퓨전최적의 결합 학습학습 데이터 필요대규모 시스템

리랭킹 캐스케이드

캐스케이드(Cascade) 구조는 여러 단계의 랭킹 모델을 순차적으로 적용하는 패턴입니다. 각 단계는 이전 단계보다 더 정확하지만 더 느린 모델을 사용하며, 처리할 문서 수를 점진적으로 줄여나갑니다.

캐스케이드 구현

ranking_cascade.py
python
from sentence_transformers import CrossEncoder
from dataclasses import dataclass
 
@dataclass
class CascadeConfig:
    stage_name: str
    model_name: str
    input_size: int
    output_size: int
 
class RankingCascade:
    """다단계 리랭킹 캐스케이드"""
 
    def __init__(self, stages: list[CascadeConfig]):
        self.stages = stages
        self.models = {}
        for stage in stages:
            self.models[stage.stage_name] = CrossEncoder(stage.model_name)
 
    def rerank(self, query: str, candidates: list[dict]) -> list[dict]:
        """캐스케이드 리랭킹 실행"""
        current_candidates = candidates
 
        for stage in self.stages:
            model = self.models[stage.stage_name]
            top_candidates = current_candidates[:stage.input_size]
 
            pairs = [[query, c["text"]] for c in top_candidates]
            scores = model.predict(pairs)
 
            for candidate, score in zip(top_candidates, scores):
                candidate[f"{stage.stage_name}_score"] = float(score)
 
            top_candidates.sort(
                key=lambda x: x[f"{stage.stage_name}_score"],
                reverse=True,
            )
            current_candidates = top_candidates[:stage.output_size]
 
        return current_candidates
 
# 사용 예시
cascade = RankingCascade([
    CascadeConfig(
        stage_name="light_rerank",
        model_name="cross-encoder/ms-marco-MiniLM-L-6-v2",
        input_size=200,
        output_size=50,
    ),
    CascadeConfig(
        stage_name="heavy_rerank",
        model_name="cross-encoder/ms-marco-MiniLM-L-12-v2",
        input_size=50,
        output_size=10,
    ),
])
Tip

캐스케이드의 각 단계에서 처리하는 문서 수의 비율은 보통 5:1 ~ 10:1 정도가 적절합니다. 예를 들어 1단계에서 1000개를 추출했다면 2단계에서는 100-200개, 3단계에서는 20-50개로 줄여나갑니다.


성능-품질 트레이드오프 최적화

검색 시스템에서 품질(관련성)과 성능(지연 시간)은 트레이드오프 관계에 있습니다. 리랭킹 단계를 추가할수록 품질은 향상되지만 지연 시간이 증가합니다.

지연 시간 프로파일링

latency_profiling.py
python
import time
from contextlib import contextmanager
from dataclasses import dataclass, field
 
@dataclass
class LatencyProfile:
    stages: dict[str, float] = field(default_factory=dict)
 
    @contextmanager
    def measure(self, stage_name: str):
        start = time.perf_counter()
        yield
        elapsed = time.perf_counter() - start
        self.stages[stage_name] = elapsed * 1000  # ms
 
    def total_ms(self) -> float:
        return sum(self.stages.values())
 
    def report(self) -> str:
        lines = [f"Total: {self.total_ms():.1f}ms"]
        for stage, ms in self.stages.items():
            pct = ms / self.total_ms() * 100
            lines.append(f"  {stage}: {ms:.1f}ms ({pct:.0f}%)")
        return "\n".join(lines)
 
# 사용 예시
profile = LatencyProfile()
 
with profile.measure("bm25_retrieval"):
    bm25_results = bm25_search(query, k=500)
 
with profile.measure("knn_retrieval"):
    knn_results = knn_search(query, k=500)
 
with profile.measure("rrf_fusion"):
    fused = rrf_fusion([bm25_results, knn_results], top_n=100)
 
with profile.measure("cross_encoder_rerank"):
    reranked = cross_encoder_rerank(query, fused, top_n=10)
 
print(profile.report())
# Total: 285.3ms
#   bm25_retrieval: 12.5ms (4%)
#   knn_retrieval: 18.2ms (6%)
#   rrf_fusion: 1.6ms (1%)
#   cross_encoder_rerank: 253.0ms (89%)

최적화 전략

리랭킹 단계의 지연 시간은 보통 전체 파이프라인의 80-90%를 차지합니다. 다음 전략으로 이를 최적화할 수 있습니다.

1. 배치 처리와 GPU 활용

batch_reranking.py
python
import torch
 
def batch_rerank(model, query: str, documents: list[str], batch_size: int = 32):
    """배치 단위 Cross-encoder 리랭킹"""
    all_scores = []
    pairs = [[query, doc] for doc in documents]
 
    for i in range(0, len(pairs), batch_size):
        batch = pairs[i:i + batch_size]
        with torch.no_grad():
            scores = model.predict(batch, show_progress_bar=False)
        all_scores.extend(scores)
 
    return all_scores

2. 모델 경량화

모델파라미터 수지연 시간 (100문서)NDCG@10
MiniLM-L-6-v222M약 80ms0.82
MiniLM-L-12-v233M약 150ms0.85
DeBERTa-v3-base86M약 350ms0.89

3. 리랭킹 대상 수 조절

rank_window_size를 줄이면 지연 시간이 비례적으로 감소합니다. 핵심은 "충분히 좋은" 결과를 제공하는 최소한의 window size를 찾는 것입니다.

4. 캐싱

인기 쿼리의 리랭킹 결과를 캐시하면 반복 쿼리의 지연 시간을 제거할 수 있습니다.

rerank_caching.py
python
import hashlib
from functools import lru_cache
 
def cache_key(query: str, doc_ids: list[str]) -> str:
    content = f"{query}|{'|'.join(doc_ids)}"
    return hashlib.md5(content.encode()).hexdigest()
 
# 간단한 인메모리 캐시 (프로덕션에서는 Redis 등 사용)
rerank_cache: dict[str, list[float]] = {}
 
def cached_rerank(model, query: str, documents: list[dict]) -> list[float]:
    doc_ids = [d["id"] for d in documents]
    key = cache_key(query, doc_ids)
 
    if key in rerank_cache:
        return rerank_cache[key]
 
    scores = model.predict([[query, d["text"]] for d in documents])
    rerank_cache[key] = scores.tolist()
    return scores.tolist()
Warning

캐싱은 검색 결과가 자주 변경되는 시스템에서는 주의가 필요합니다. 문서가 업데이트되거나 삭제되었을 때 캐시를 적시에 무효화해야 합니다. TTL(Time-To-Live)을 설정하여 캐시의 신선도를 관리하세요.


실전 파이프라인 설계 패턴

지연 시간 SLA별 권장 구성

지연 시간 목표파이프라인 구성
50ms 이하BM25 + kNN (RRF)
200ms 이하BM25 + kNN (RRF) + 경량 Cross-encoder
500ms 이하BM25 + kNN (RRF) + 2단계 캐스케이드
1초 이하전체 캐스케이드 + 개인화 + 다양성

정리

이번 장에서는 하이브리드 검색과 리랭킹을 결합한 다단계 파이프라인의 설계와 최적화를 다루었습니다. BM25와 시맨틱 검색의 상호 보완적 강점을 확인했고, RRF와 선형 보간의 퓨전 전략을 비교했습니다. 캐스케이드 구조로 정확도와 속도를 단계적으로 조절하는 방법, 그리고 배치 처리, 모델 경량화, 캐싱 등의 최적화 전략을 학습했습니다.

다음 장에서는 검색 결과를 사용자 개개인에게 맞춤화하는 검색 개인화를 다룹니다. 사용자 프로파일링, 클릭 이력 기반 개인화, 임베딩 기반 사용자 벡터, 프라이버시 고려사항까지 살펴보겠습니다.

이 글이 도움이 되셨나요?

관련 주제 더 보기

#search#ai

관련 글

AI / ML

9장: 검색 개인화

사용자 프로파일링, 클릭 이력 기반 개인화, 임베딩 기반 사용자 벡터, 인기도 편향 문제, 프라이버시 고려사항과 실시간 개인화를 다룹니다.

2026년 2월 19일·15분
AI / ML

7장: OpenSearch와 기타 검색 엔진

OpenSearch 신경 검색, 재랭킹 파이프라인과 Algolia, Meilisearch, Typesense 등 주요 검색 엔진의 AI 검색 기능을 비교합니다.

2026년 2월 15일·12분
AI / ML

10장: 피드백 루프와 지속적 개선

클릭 신호 수집, 암묵적/명시적 피드백, 온라인 학습, A/B 테스트 자동화, 검색 품질 모니터링, 차가운 시작 문제를 다룹니다.

2026년 2월 21일·17분
이전 글7장: OpenSearch와 기타 검색 엔진
다음 글9장: 검색 개인화

댓글

목차

약 13분 남음
  • 학습 목표
  • 하이브리드 검색의 필요성
    • 각 방식이 강한 쿼리 유형
  • 점수 퓨전 전략
    • RRF (Reciprocal Rank Fusion)
    • 선형 보간 (Linear Interpolation)
    • 퓨전 전략 비교
  • 리랭킹 캐스케이드
    • 캐스케이드 구현
  • 성능-품질 트레이드오프 최적화
    • 지연 시간 프로파일링
    • 최적화 전략
  • 실전 파이프라인 설계 패턴
    • 지연 시간 SLA별 권장 구성
  • 정리