본문으로 건너뛰기
Kreath Archive
TechProjectsBooksAbout
TechProjectsBooksAbout

내비게이션

  • Tech
  • Projects
  • Books
  • About
  • Tags

카테고리

  • AI / ML
  • 웹 개발
  • 프로그래밍
  • 개발 도구

연결

  • GitHub
  • Email
  • RSS
© 2026 Kreath Archive. All rights reserved.Built with Next.js + MDX
홈TechProjectsBooksAbout
//
  1. 홈
  2. 테크
  3. 11장: 실전 프로젝트 — AI 검색 시스템 구축
2026년 2월 23일·AI / ML·

11장: 실전 프로젝트 — AI 검색 시스템 구축

Elasticsearch, Cross-encoder 리랭킹, 개인화를 통합한 AI 검색 시스템의 전체 아키텍처 설계부터 구현, 벤치마킹, 운영 체크리스트까지 다룹니다.

17분1,526자12개 섹션
searchai
공유
ai-search11 / 11
1234567891011
이전10장: 피드백 루프와 지속적 개선

학습 목표

  • 시리즈 전체에서 다룬 기술을 통합한 AI 검색 시스템을 설계합니다.
  • Elasticsearch 기반 인덱싱 파이프라인과 검색 API를 구현합니다.
  • Cross-encoder 리랭킹과 개인화를 파이프라인에 통합합니다.
  • 피드백 수집 시스템을 구축합니다.
  • 성능 벤치마킹과 운영 체크리스트를 작성합니다.

프로젝트 개요

이번 장에서는 기술 블로그 문서를 대상으로 하는 AI 검색 시스템을 구축합니다. 시리즈에서 다룬 하이브리드 검색, 리랭킹, 개인화, 피드백 루프를 모두 통합한 실전 시스템입니다.

기능 요구사항

  • 키워드 검색과 시맨틱 검색을 결합한 하이브리드 검색
  • Cross-encoder 기반 리랭킹으로 검색 품질 향상
  • 사용자 클릭 이력 기반 개인화
  • 피드백 수집 및 품질 모니터링
  • 200ms 이내의 검색 응답 시간

전체 아키텍처

기술 스택

컴포넌트기술
검색 APIFastAPI (Python 3.12)
검색 엔진Elasticsearch 8.15
임베딩 모델multilingual-e5-large
리랭킹 모델cross-encoder/ms-marco-MiniLM-L-12-v2
캐시Redis
피드백 저장PostgreSQL
컨테이너Docker Compose

프로젝트 구조

ai-search-project/
  docker-compose.yml
  api/
    main.py
    config.py
    models/
      document.py
      search.py
      feedback.py
    services/
      indexing.py
      search.py
      reranking.py
      personalization.py
      feedback.py
    routers/
      search.py
      feedback.py
      admin.py
  scripts/
    index_documents.py
    benchmark.py

인덱싱 파이프라인

Elasticsearch 인덱스 설정

api/services/indexing.py
python
from elasticsearch import Elasticsearch, helpers
from sentence_transformers import SentenceTransformer
from langchain.text_splitter import RecursiveCharacterTextSplitter
 
class IndexingService:
    """문서 인덱싱 서비스"""
 
    INDEX_NAME = "tech-articles"
 
    INDEX_SETTINGS = {
        "settings": {
            "index": {
                "number_of_shards": 1,
                "number_of_replicas": 0,
                "knn": True,
            },
            "analysis": {
                "analyzer": {
                    "korean": {
                        "type": "custom",
                        "tokenizer": "nori_tokenizer",
                        "filter": ["nori_readingform", "lowercase"],
                    }
                }
            },
        },
        "mappings": {
            "properties": {
                "doc_id": {"type": "keyword"},
                "title": {"type": "text", "analyzer": "korean"},
                "content": {"type": "text", "analyzer": "korean"},
                "chunk_text": {"type": "text", "analyzer": "korean"},
                "chunk_index": {"type": "integer"},
                "content_vector": {
                    "type": "dense_vector",
                    "dims": 1024,
                    "index": True,
                    "similarity": "cosine",
                },
                "category": {"type": "keyword"},
                "tags": {"type": "keyword"},
                "published_date": {"type": "date"},
                "url": {"type": "keyword"},
            }
        },
    }
 
    def __init__(self, es_url: str = "http://localhost:9200"):
        self.es = Elasticsearch(es_url)
        self.embed_model = SentenceTransformer("intfloat/multilingual-e5-large")
        self.splitter = RecursiveCharacterTextSplitter(
            chunk_size=500,
            chunk_overlap=100,
            separators=["\n\n", "\n", ". ", " "],
        )
 
    def create_index(self):
        """인덱스 생성 (기존 인덱스 삭제 후)"""
        if self.es.indices.exists(index=self.INDEX_NAME):
            self.es.indices.delete(index=self.INDEX_NAME)
        self.es.indices.create(index=self.INDEX_NAME, body=self.INDEX_SETTINGS)
 
    def index_document(self, doc: dict):
        """단일 문서 인덱싱 (청킹 + 임베딩)"""
        chunks = self.splitter.split_text(doc["content"])
 
        actions = []
        for i, chunk in enumerate(chunks):
            embedding = self.embed_model.encode(
                f"passage: {doc['title']} {chunk}",
                normalize_embeddings=True,
            )
 
            actions.append({
                "_index": self.INDEX_NAME,
                "_source": {
                    "doc_id": doc["id"],
                    "title": doc["title"],
                    "content": doc["content"],
                    "chunk_text": chunk,
                    "chunk_index": i,
                    "content_vector": embedding.tolist(),
                    "category": doc.get("category", ""),
                    "tags": doc.get("tags", []),
                    "published_date": doc.get("published_date"),
                    "url": doc.get("url", ""),
                },
            })
 
        helpers.bulk(self.es, actions)
 
    def bulk_index(self, documents: list[dict], batch_size: int = 50):
        """대량 문서 인덱싱"""
        for i in range(0, len(documents), batch_size):
            batch = documents[i:i + batch_size]
            for doc in batch:
                self.index_document(doc)
            print(f"Indexed {min(i + batch_size, len(documents))}/{len(documents)}")

검색 API

검색 서비스

api/services/search.py
python
from elasticsearch import Elasticsearch
from sentence_transformers import SentenceTransformer
import numpy as np
 
class SearchService:
    """하이브리드 검색 서비스"""
 
    INDEX_NAME = "tech-articles"
 
    def __init__(self, es_url: str = "http://localhost:9200"):
        self.es = Elasticsearch(es_url)
        self.embed_model = SentenceTransformer("intfloat/multilingual-e5-large")
 
    def hybrid_search(
        self,
        query: str,
        k: int = 20,
        category: str = None,
        date_from: str = None,
    ) -> list[dict]:
        """BM25 + kNN 하이브리드 검색 (RRF 퓨전)"""
        query_vector = self.embed_model.encode(
            f"query: {query}", normalize_embeddings=True
        )
 
        # 필터 조건 구성
        filter_clauses = []
        if category:
            filter_clauses.append({"term": {"category": category}})
        if date_from:
            filter_clauses.append({"range": {"published_date": {"gte": date_from}}})
 
        es_filter = {"bool": {"must": filter_clauses}} if filter_clauses else None
 
        # BM25 검색
        bm25_query = {
            "bool": {
                "must": [
                    {
                        "multi_match": {
                            "query": query,
                            "fields": ["title^2", "chunk_text", "content"],
                            "analyzer": "korean",
                        }
                    }
                ],
            }
        }
        if filter_clauses:
            bm25_query["bool"]["filter"] = filter_clauses
 
        # kNN 검색
        knn_params = {
            "field": "content_vector",
            "query_vector": query_vector.tolist(),
            "k": k * 5,
            "num_candidates": k * 20,
        }
        if es_filter:
            knn_params["filter"] = es_filter
 
        # RRF 하이브리드 검색
        response = self.es.search(
            index=self.INDEX_NAME,
            body={
                "retriever": {
                    "rrf": {
                        "retrievers": [
                            {"standard": {"query": bm25_query}},
                            {"knn": knn_params},
                        ],
                        "rank_window_size": k * 10,
                        "rank_constant": 60,
                    }
                },
                "size": k,
                "_source": [
                    "doc_id", "title", "chunk_text", "category",
                    "tags", "published_date", "url",
                ],
            },
        )
 
        results = []
        seen_docs = set()
        for hit in response["hits"]["hits"]:
            doc_id = hit["_source"]["doc_id"]
            # 같은 문서의 여러 청크 중복 제거
            if doc_id in seen_docs:
                continue
            seen_docs.add(doc_id)
 
            results.append({
                "doc_id": doc_id,
                "title": hit["_source"]["title"],
                "snippet": hit["_source"]["chunk_text"][:300],
                "category": hit["_source"]["category"],
                "tags": hit["_source"]["tags"],
                "published_date": hit["_source"]["published_date"],
                "url": hit["_source"]["url"],
                "score": hit["_score"],
            })
 
        return results

리랭킹 서비스

api/services/reranking.py
python
from sentence_transformers import CrossEncoder
 
class RerankingService:
    """Cross-encoder 리랭킹 서비스"""
 
    def __init__(self, model_name: str = "cross-encoder/ms-marco-MiniLM-L-12-v2"):
        self.model = CrossEncoder(model_name)
 
    def rerank(
        self,
        query: str,
        results: list[dict],
        top_k: int = 10,
    ) -> list[dict]:
        """검색 결과 리랭킹"""
        if not results:
            return results
 
        pairs = [
            [query, f"{r['title']} {r['snippet']}"]
            for r in results
        ]
 
        scores = self.model.predict(pairs, show_progress_bar=False)
 
        for result, score in zip(results, scores):
            result["rerank_score"] = float(score)
 
        results.sort(key=lambda x: x["rerank_score"], reverse=True)
        return results[:top_k]

개인화 서비스

api/services/personalization.py
python
import json
import numpy as np
import redis
 
class PersonalizationService:
    """검색 결과 개인화 서비스"""
 
    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.redis = redis.from_url(redis_url)
        self.personal_weight = 0.15
 
    def get_user_vector(self, user_id: str) -> np.ndarray | None:
        """Redis에서 사용자 벡터 조회"""
        cached = self.redis.get(f"user_vector:{user_id}")
        if cached:
            return np.array(json.loads(cached))
        return None
 
    def update_user_vector(
        self,
        user_id: str,
        doc_embedding: list[float],
        decay: float = 0.9,
    ):
        """클릭 시 사용자 벡터 업데이트"""
        current = self.get_user_vector(user_id)
        new_vec = np.array(doc_embedding)
 
        if current is not None:
            updated = decay * current + (1 - decay) * new_vec
        else:
            updated = new_vec
 
        # 정규화
        norm = np.linalg.norm(updated)
        if norm > 0:
            updated = updated / norm
 
        self.redis.setex(
            f"user_vector:{user_id}",
            86400 * 30,  # 30일 TTL
            json.dumps(updated.tolist()),
        )
 
    def personalize(
        self,
        results: list[dict],
        user_id: str | None,
    ) -> list[dict]:
        """검색 결과 개인화"""
        if not user_id:
            return results
 
        user_vector = self.get_user_vector(user_id)
        if user_vector is None:
            return results
 
        for result in results:
            if "embedding" in result:
                doc_vec = np.array(result["embedding"])
                personal_score = float(np.dot(user_vector, doc_vec))
                base_score = result.get("rerank_score", result.get("score", 0))
                result["final_score"] = (
                    (1 - self.personal_weight) * base_score
                    + self.personal_weight * personal_score
                )
            else:
                result["final_score"] = result.get("rerank_score", result.get("score", 0))
 
        results.sort(key=lambda x: x["final_score"], reverse=True)
        return results

FastAPI 엔드포인트

api/routers/search.py
python
from fastapi import APIRouter, Query, Header
from pydantic import BaseModel
 
router = APIRouter(prefix="/api/search", tags=["search"])
 
class SearchRequest(BaseModel):
    query: str
    category: str | None = None
    date_from: str | None = None
    page: int = 1
    size: int = 10
 
class SearchResult(BaseModel):
    doc_id: str
    title: str
    snippet: str
    category: str
    tags: list[str]
    published_date: str | None
    url: str
    score: float
 
class SearchResponse(BaseModel):
    query: str
    total: int
    results: list[SearchResult]
    latency_ms: float
 
@router.post("/", response_model=SearchResponse)
async def search(
    request: SearchRequest,
    x_user_id: str | None = Header(None),
):
    """통합 검색 API"""
    import time
    start = time.perf_counter()
 
    # 1. 하이브리드 검색 (BM25 + 시맨틱)
    candidates = search_service.hybrid_search(
        query=request.query,
        k=100,
        category=request.category,
        date_from=request.date_from,
    )
 
    # 2. Cross-encoder 리랭킹
    reranked = reranking_service.rerank(
        query=request.query,
        results=candidates,
        top_k=30,
    )
 
    # 3. 개인화
    personalized = personalization_service.personalize(
        results=reranked,
        user_id=x_user_id,
    )
 
    # 4. 페이지네이션
    start_idx = (request.page - 1) * request.size
    end_idx = start_idx + request.size
    page_results = personalized[start_idx:end_idx]
 
    elapsed = (time.perf_counter() - start) * 1000
 
    return SearchResponse(
        query=request.query,
        total=len(personalized),
        results=[
            SearchResult(
                doc_id=r["doc_id"],
                title=r["title"],
                snippet=r["snippet"],
                category=r["category"],
                tags=r["tags"],
                published_date=r.get("published_date"),
                url=r["url"],
                score=r.get("final_score", r.get("rerank_score", r.get("score", 0))),
            )
            for r in page_results
        ],
        latency_ms=round(elapsed, 1),
    )

피드백 수집 API

api/routers/feedback.py
python
from fastapi import APIRouter, Header
from pydantic import BaseModel
from datetime import datetime
 
router = APIRouter(prefix="/api/feedback", tags=["feedback"])
 
class ClickFeedback(BaseModel):
    query_id: str
    query_text: str
    doc_id: str
    position: int
    dwell_time_ms: int | None = None
 
@router.post("/click")
async def record_click(
    feedback: ClickFeedback,
    x_user_id: str | None = Header(None),
):
    """클릭 피드백 기록"""
    feedback_service.record_click(
        query_id=feedback.query_id,
        query_text=feedback.query_text,
        doc_id=feedback.doc_id,
        position=feedback.position,
        dwell_time_ms=feedback.dwell_time_ms,
        user_id=x_user_id,
        timestamp=datetime.now(),
    )
 
    # 사용자 벡터 업데이트 (비동기)
    if x_user_id:
        doc_embedding = get_document_embedding(feedback.doc_id)
        if doc_embedding:
            personalization_service.update_user_vector(x_user_id, doc_embedding)
 
    return {"status": "recorded"}

성능 벤치마킹

scripts/benchmark.py
python
import time
import statistics
import json
 
class SearchBenchmark:
    """검색 시스템 성능 벤치마크"""
 
    def __init__(self, search_url: str):
        self.search_url = search_url
 
    def run(
        self,
        queries: list[str],
        iterations: int = 3,
    ) -> dict:
        """벤치마크 실행"""
        import requests
 
        all_latencies = []
 
        for query in queries:
            query_latencies = []
            for _ in range(iterations):
                start = time.perf_counter()
                response = requests.post(
                    f"{self.search_url}/api/search/",
                    json={"query": query, "size": 10},
                )
                elapsed = (time.perf_counter() - start) * 1000
                query_latencies.append(elapsed)
 
            all_latencies.extend(query_latencies)
 
        all_latencies.sort()
        return {
            "total_queries": len(queries) * iterations,
            "p50_ms": round(statistics.median(all_latencies), 1),
            "p95_ms": round(all_latencies[int(len(all_latencies) * 0.95)], 1),
            "p99_ms": round(all_latencies[int(len(all_latencies) * 0.99)], 1),
            "mean_ms": round(statistics.mean(all_latencies), 1),
            "min_ms": round(min(all_latencies), 1),
            "max_ms": round(max(all_latencies), 1),
        }
 
# 사용 예시
benchmark = SearchBenchmark("http://localhost:8000")
test_queries = [
    "파이썬 비동기 프로그래밍",
    "Elasticsearch 검색 최적화",
    "React 상태 관리 패턴",
    "Docker 컨테이너 네트워킹",
    "PostgreSQL 인덱스 튜닝",
]
 
results = benchmark.run(test_queries, iterations=5)
print(json.dumps(results, indent=2))

벤치마크 목표와 실측

단계목표 지연 시간측정 방법
BM25 + kNN (RRF)50ms 이하Elasticsearch 내부 처리
Cross-encoder 리랭킹100-150ms30개 문서 기준
개인화5ms 이하Redis 조회 + 점수 계산
전체 E2E200ms 이하API 응답 시간
Tip

벤치마크 시에는 워밍업 실행을 포함해야 합니다. 첫 번째 요청은 모델 로딩, 연결 설정 등으로 지연이 크므로, 실제 벤치마크 전에 10-20회의 워밍업 쿼리를 실행하세요.


운영 체크리스트

프로덕션 배포 전에 확인해야 할 항목들을 정리합니다.

검색 품질

  • 주요 쿼리 시나리오(최소 50개)에 대한 NDCG@10 측정 완료
  • 제로 결과 쿼리 목록 확인 및 동의어 사전 보완
  • 리랭킹 전/후 NDCG 비교 (최소 20% 향상 확인)
  • 다양한 쿼리 유형(단일 키워드, 구문, 질문형) 테스트

성능

  • P95 지연 시간 200ms 이하 확인
  • 동시 사용자 100명 기준 부하 테스트 통과
  • Elasticsearch 클러스터 헬스 green 상태
  • Cross-encoder 모델 GPU/CPU 리소스 확인

모니터링

  • 검색 지연 시간 대시보드 구성
  • 제로 결과율, CTR, 이탈률 모니터링 설정
  • 임계값 초과 시 알림(Slack/이메일) 설정
  • Elasticsearch 클러스터 모니터링 (디스크, 메모리, CPU)

피드백 루프

  • 클릭 피드백 수집 파이프라인 동작 확인
  • 피드백 데이터 저장 및 백업 정책 수립
  • 주간 검색 품질 리포트 자동 생성 설정

장애 대응

  • Elasticsearch 노드 장애 시 페일오버 확인
  • Cross-encoder 서비스 다운 시 폴백(리랭킹 없이 결과 반환) 구현
  • Redis 장애 시 개인화 비활성화 후 기본 검색 제공
Warning

프로덕션 배포에서 가장 흔한 실수는 리랭킹 서비스의 장애가 전체 검색을 멈추게 하는 것입니다. 리랭킹은 품질 향상을 위한 부가 기능이므로, 장애 시에는 리랭킹을 건너뛰고 하이브리드 검색 결과를 그대로 반환하는 폴백 로직을 반드시 구현하세요.


향후 개선 방향

시스템이 안정적으로 운영되면 다음 단계의 개선을 고려할 수 있습니다.

  1. ELSER 도입: BM25를 ELSER 희소 벡터로 대체하여 어휘 불일치 문제 추가 개선
  2. LTR 모델: 축적된 클릭 데이터로 LTR 모델 학습, Cross-encoder와 결합
  3. 쿼리 이해 강화: LLM 기반 쿼리 확장, HyDE 적용
  4. Elastic Rerank 전환: 자체 Cross-encoder 대신 Elastic 내장 리랭킹 활용
  5. 실시간 인덱싱: 배치 인덱싱에서 실시간 인덱싱으로 전환
  6. 다국어 확장: 영어/한국어 교차 언어 검색 지원

시리즈 정리

이 시리즈를 통해 AI 기반 검색 시스템의 이론과 실전을 모두 다루었습니다.

기초 (1-3장): 검색의 진화, 시맨틱 검색 아키텍처, 품질 평가 메트릭을 통해 AI 검색의 기본 개념과 측정 방법을 학습했습니다.

핵심 기술 (4-5장): 쿼리 이해/확장과 랭킹 모델(Bi-encoder, Cross-encoder)을 통해 검색 품질을 높이는 핵심 기술을 깊이 있게 다루었습니다.

실전 엔진 (6-7장): Elasticsearch, OpenSearch, Algolia, Meilisearch 등 실전 검색 엔진의 AI 기능을 비교하고 구현했습니다.

고급 주제 (8-10장): 하이브리드 검색 파이프라인 설계, 개인화, 피드백 루프를 통해 프로덕션 수준의 검색 시스템 운영에 필요한 지식을 갖추었습니다.

통합 프로젝트 (11장): 모든 기술을 통합하여 실전 AI 검색 시스템을 구축하고, 벤치마킹과 운영 체크리스트까지 준비했습니다.

AI 검색 기술은 빠르게 발전하고 있습니다. 이 시리즈가 검색 시스템 구축의 견고한 기반이 되어, 새로운 기술과 방법론을 지속적으로 학습하고 적용하는 데 도움이 되기를 바랍니다.

이 글이 도움이 되셨나요?

관련 주제 더 보기

#search#ai

관련 글

AI / ML

10장: 피드백 루프와 지속적 개선

클릭 신호 수집, 암묵적/명시적 피드백, 온라인 학습, A/B 테스트 자동화, 검색 품질 모니터링, 차가운 시작 문제를 다룹니다.

2026년 2월 21일·17분
AI / ML

9장: 검색 개인화

사용자 프로파일링, 클릭 이력 기반 개인화, 임베딩 기반 사용자 벡터, 인기도 편향 문제, 프라이버시 고려사항과 실시간 개인화를 다룹니다.

2026년 2월 19일·15분
AI / ML

8장: 하이브리드 검색과 리랭킹 파이프라인

BM25와 시맨틱 검색의 결합 전략, RRF/선형 보간, 리랭킹 캐스케이드, 다단계 검색 파이프라인 설계와 성능-품질 트레이드오프를 다룹니다.

2026년 2월 17일·13분
이전 글10장: 피드백 루프와 지속적 개선

댓글

목차

약 17분 남음
  • 학습 목표
  • 프로젝트 개요
    • 기능 요구사항
  • 전체 아키텍처
    • 기술 스택
  • 프로젝트 구조
  • 인덱싱 파이프라인
    • Elasticsearch 인덱스 설정
  • 검색 API
    • 검색 서비스
    • 리랭킹 서비스
    • 개인화 서비스
  • FastAPI 엔드포인트
  • 피드백 수집 API
  • 성능 벤치마킹
    • 벤치마크 목표와 실측
  • 운영 체크리스트
    • 검색 품질
    • 성능
    • 모니터링
    • 피드백 루프
    • 장애 대응
  • 향후 개선 방향
  • 시리즈 정리