문서 로딩부터 임베딩 생성, 벡터 저장, 유사도 검색까지 RAG 파이프라인의 전체 흐름을 실제 코드로 구현합니다.
앞선 장들에서 임베딩 모델, 청킹 전략, 벡터 데이터베이스를 각각 살펴보았습니다. 이번 장에서는 이 구성 요소들을 조합하여 실제 동작하는 RAG 파이프라인을 구축합니다. 파이프라인은 크게 두 부분으로 나뉩니다.
[오프라인] 인덱싱 파이프라인:
문서 소스 --> 문서 로딩 --> 전처리 --> 청킹 --> 임베딩 --> 벡터 DB 저장
[온라인] 검색 및 생성 파이프라인:
사용자 질문 --> 쿼리 임베딩 --> 벡터 검색 --> 컨텍스트 구성 --> LLM 생성 --> 응답다양한 형식의 문서를 텍스트로 변환하는 첫 단계입니다. LangChain은 PDF, HTML, Markdown, CSV, JSON 등 다양한 문서 형식에 대한 로더를 제공합니다.
from langchain_community.document_loaders import PyPDFLoader
# 단일 PDF 로딩
loader = PyPDFLoader("documents/company_policy.pdf")
pages = loader.load()
print(f"총 {len(pages)}페이지 로딩")
print(f"첫 페이지 내용: {pages[0].page_content[:200]}")
print(f"메타데이터: {pages[0].metadata}")
# {'source': 'documents/company_policy.pdf', 'page': 0}from langchain_community.document_loaders import DirectoryLoader
# 디렉토리 내 모든 PDF 로딩
loader = DirectoryLoader(
"documents/",
glob="**/*.pdf",
loader_cls=PyPDFLoader,
show_progress=True,
use_multithreading=True
)
all_documents = loader.load()
print(f"총 {len(all_documents)}개 문서 로딩")from langchain_community.document_loaders import WebBaseLoader
loader = WebBaseLoader(
web_paths=["https://docs.example.com/api-reference"],
bs_kwargs={
"parse_only": ["main", "article"] # 본문만 추출
}
)
web_docs = loader.load()PDF 로딩의 품질은 파서에 따라 크게 달라집니다. PyPDFLoader는 범용적이지만, 표나 복잡한 레이아웃에서는 정보가 손실될 수 있습니다. 정확도가 중요한 경우 Unstructured, LlamaParse, 또는 Amazon Textract 같은 전문 파서를 검토하세요.
로딩된 문서는 청킹 전에 전처리가 필요합니다. 불필요한 공백 정리, 특수 문자 처리, 메타데이터 보강 등을 수행합니다.
import re
def preprocess_document(doc):
"""문서 전처리 파이프라인"""
text = doc.page_content
# 연속 공백 및 줄바꿈 정리
text = re.sub(r"\n{3,}", "\n\n", text)
text = re.sub(r" {2,}", " ", text)
# 머리글/바닥글 패턴 제거 (PDF에서 흔함)
text = re.sub(r"Page \d+ of \d+", "", text)
text = re.sub(r"Confidential.*$", "", text, flags=re.MULTILINE)
# 너무 짧은 문서 필터링
if len(text.strip()) < 50:
return None
doc.page_content = text.strip()
return doc
# 전처리 적용
processed_docs = [
preprocess_document(doc) for doc in all_documents
]
processed_docs = [doc for doc in processed_docs if doc is not None]
print(f"전처리 후 {len(processed_docs)}개 문서")전처리된 문서를 청킹하고 벡터 데이터베이스에 저장하는 전체 인덱싱 파이프라인을 구현합니다.
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
from langchain_qdrant import QdrantVectorStore
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams
import hashlib
import time
class RAGIndexingPipeline:
"""프로덕션용 RAG 인덱싱 파이프라인"""
def __init__(
self,
collection_name="knowledge_base",
chunk_size=512,
chunk_overlap=50,
embedding_model="text-embedding-3-small",
qdrant_url="http://localhost:6333"
):
self.collection_name = collection_name
self.splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separators=["\n\n", "\n", ". ", " "]
)
self.embeddings = OpenAIEmbeddings(model=embedding_model)
self.qdrant_client = QdrantClient(url=qdrant_url)
# 컬렉션이 없으면 생성
collections = [
c.name for c in self.qdrant_client.get_collections().collections
]
if collection_name not in collections:
self.qdrant_client.create_collection(
collection_name=collection_name,
vectors_config=VectorParams(
size=1536,
distance=Distance.COSINE
)
)
def _generate_chunk_id(self, source, chunk_index):
"""청크의 고유 ID 생성"""
raw = source + str(chunk_index)
return hashlib.md5(raw.encode()).hexdigest()
def index_documents(self, documents, batch_size=100):
"""문서 목록을 인덱싱"""
# 1. 청킹
print("청킹 시작...")
chunks = self.splitter.split_documents(documents)
print(f"총 {len(chunks)}개 청크 생성")
# 2. 배치 단위로 임베딩 및 저장
total_batches = (len(chunks) + batch_size - 1) // batch_size
for i in range(0, len(chunks), batch_size):
batch = chunks[i:i + batch_size]
batch_num = i // batch_size + 1
print(f"배치 {batch_num}/{total_batches} 처리 중...")
# 임베딩 생성
texts = [chunk.page_content for chunk in batch]
vectors = self.embeddings.embed_documents(texts)
# Qdrant에 저장
points = []
for j, (chunk, vector) in enumerate(zip(batch, vectors)):
source = chunk.metadata.get("source", "unknown")
point_id = self._generate_chunk_id(source, i + j)
points.append({
"id": point_id,
"vector": vector,
"payload": {
"content": chunk.page_content,
"source": source,
"page": chunk.metadata.get("page"),
"chunk_index": i + j,
"indexed_at": time.strftime("%Y-%m-%dT%H:%M:%SZ"),
}
})
self.qdrant_client.upsert(
collection_name=self.collection_name,
points=points
)
print(f"인덱싱 완료: {len(chunks)}개 청크")
return len(chunks)
# 사용 예시
pipeline = RAGIndexingPipeline(
collection_name="company_docs",
chunk_size=512,
chunk_overlap=50
)
pipeline.index_documents(processed_docs)인덱싱된 벡터 데이터베이스에서 사용자 질문에 관련된 청크를 검색하고, LLM을 통해 답변을 생성하는 파이프라인을 구현합니다.
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_qdrant import QdrantVectorStore
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
class RAGRetrievalPipeline:
"""검색 및 생성 파이프라인"""
def __init__(
self,
collection_name="knowledge_base",
embedding_model="text-embedding-3-small",
llm_model="gpt-4o",
qdrant_url="http://localhost:6333",
top_k=5
):
self.embeddings = OpenAIEmbeddings(model=embedding_model)
self.llm = ChatOpenAI(model=llm_model, temperature=0)
self.vectorstore = QdrantVectorStore.from_existing_collection(
embedding=self.embeddings,
collection_name=collection_name,
url=qdrant_url
)
self.retriever = self.vectorstore.as_retriever(
search_type="similarity",
search_kwargs={"k": top_k}
)
# 프롬프트 템플릿
self.prompt = ChatPromptTemplate.from_messages([
("system", """당신은 회사 문서를 기반으로 질문에 답변하는 도우미입니다.
주어진 컨텍스트만을 사용하여 답변하세요.
컨텍스트에 답변할 수 있는 정보가 없으면 "해당 정보를 찾을 수 없습니다"라고 답하세요.
답변 시 관련 문서 출처를 함께 제시하세요."""),
("human", """컨텍스트:
{context}
질문: {question}""")
])
# 체인 구성
self.chain = (
{
"context": self.retriever | self._format_docs,
"question": RunnablePassthrough()
}
| self.prompt
| self.llm
| StrOutputParser()
)
def _format_docs(self, docs):
"""검색 결과를 텍스트로 포맷"""
formatted = []
for i, doc in enumerate(docs):
source = doc.metadata.get("source", "알 수 없음")
formatted.append(
f"[문서 {i+1}] (출처: {source})\n{doc.page_content}"
)
return "\n\n---\n\n".join(formatted)
def query(self, question):
"""질문에 대한 답변 생성"""
return self.chain.invoke(question)
def query_with_sources(self, question):
"""답변과 함께 출처 문서도 반환"""
docs = self.retriever.invoke(question)
context = self._format_docs(docs)
response = self.chain.invoke(question)
return {
"answer": response,
"sources": [
{
"content": doc.page_content,
"source": doc.metadata.get("source"),
"page": doc.metadata.get("page"),
}
for doc in docs
]
}
# 사용 예시
rag = RAGRetrievalPipeline(collection_name="company_docs")
# 단순 질문-답변
answer = rag.query("재택근무 신청은 어떻게 하나요?")
print(answer)
# 출처 포함 답변
result = rag.query_with_sources("연차 일수는 어떻게 계산되나요?")
print(f"답변: {result['answer']}")
print(f"참조 문서 수: {len(result['sources'])}")사용자의 질문을 그대로 검색에 사용하는 것보다, 검색에 최적화된 형태로 변환하면 더 좋은 결과를 얻을 수 있습니다.
LLM을 사용하여 사용자의 질문을 검색에 더 적합한 형태로 변환합니다.
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
rewrite_prompt = ChatPromptTemplate.from_messages([
("system", """사용자의 질문을 벡터 검색에 최적화된 형태로 재작성하세요.
- 핵심 키워드를 포함하세요
- 구어체를 문어체로 변환하세요
- 약어를 풀어서 작성하세요
재작성된 질문만 출력하세요."""),
("human", "{question}")
])
rewriter = rewrite_prompt | ChatOpenAI(model="gpt-4o-mini") | StrOutputParser()
original = "재택 어케 신청함?"
rewritten = rewriter.invoke({"question": original})
# "재택근무 신청 절차와 방법은 무엇인가요?"하나의 질문에서 여러 관점의 쿼리를 생성하여 검색 범위를 넓힙니다.
from langchain.retrievers import MultiQueryRetriever
multi_retriever = MultiQueryRetriever.from_llm(
retriever=vectorstore.as_retriever(search_kwargs={"k": 5}),
llm=ChatOpenAI(model="gpt-4o-mini", temperature=0.3)
)
# 내부적으로 여러 쿼리를 생성하여 검색한 뒤 결과를 합침
results = multi_retriever.invoke("신입사원 온보딩 절차")질문에 대한 가상의 답변을 먼저 생성하고, 그 답변의 임베딩으로 검색합니다. 질문 벡터보다 답변 벡터가 실제 문서와 더 유사한 공간에 위치하는 경향을 활용합니다.
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
hyde_prompt = ChatPromptTemplate.from_messages([
("system", "주어진 질문에 대한 답변을 작성하세요. 정확하지 않아도 됩니다."),
("human", "{question}")
])
def hyde_search(question, vectorstore, k=5):
"""HyDE 기반 검색"""
# 1. 가상 답변 생성
llm = ChatOpenAI(model="gpt-4o-mini")
hypothetical_answer = (
hyde_prompt | llm | StrOutputParser()
).invoke({"question": question})
# 2. 가상 답변의 임베딩으로 검색
results = vectorstore.similarity_search(hypothetical_answer, k=k)
return results쿼리 변환 기법은 LLM 호출을 추가하므로 지연 시간(Latency)이 증가합니다. 실시간 응답이 중요한 경우, 쿼리 재작성 한 단계만 적용하는 것이 현실적입니다. HyDE는 효과적이지만 응답 시간에 민감한 서비스에서는 주의가 필요합니다.
프로덕션 환경에서는 전체 재인덱싱이 아닌, 변경된 문서만 업데이트하는 증분 인덱싱이 필요합니다.
import hashlib
import json
class IncrementalIndexer:
"""증분 인덱싱 관리자"""
def __init__(self, pipeline, state_file="index_state.json"):
self.pipeline = pipeline
self.state_file = state_file
self.state = self._load_state()
def _load_state(self):
try:
with open(self.state_file, "r") as f:
return json.load(f)
except FileNotFoundError:
return {}
def _save_state(self):
with open(self.state_file, "w") as f:
json.dump(self.state, f, indent=2)
def _compute_hash(self, content):
return hashlib.sha256(content.encode()).hexdigest()
def sync_documents(self, documents):
"""문서 동기화 - 변경된 것만 업데이트"""
to_add = []
to_delete = []
current_sources = set()
for doc in documents:
source = doc.metadata.get("source", "unknown")
current_sources.add(source)
content_hash = self._compute_hash(doc.page_content)
if source not in self.state:
# 새 문서
to_add.append(doc)
self.state[source] = content_hash
elif self.state[source] != content_hash:
# 변경된 문서
to_delete.append(source)
to_add.append(doc)
self.state[source] = content_hash
# 삭제된 문서 감지
for source in list(self.state.keys()):
if source not in current_sources:
to_delete.append(source)
del self.state[source]
# 삭제 실행
if to_delete:
print(f"{len(to_delete)}개 문서 삭제")
for source in to_delete:
self.pipeline.delete_by_source(source)
# 추가 실행
if to_add:
print(f"{len(to_add)}개 문서 인덱싱")
self.pipeline.index_documents(to_add)
self._save_state()
print(f"동기화 완료: 추가 {len(to_add)}, 삭제 {len(to_delete)}")검색된 청크를 LLM에 전달하기 전에 후처리를 통해 품질을 높일 수 있습니다.
def post_process_results(results, min_score=0.7, max_tokens=3000):
"""검색 결과 후처리"""
# 1. 최소 유사도 기준 필터링
filtered = [
doc for doc in results
if doc.metadata.get("score", 1.0) >= min_score
]
# 2. 중복 제거 (유사한 내용의 청크 제거)
deduplicated = []
seen_contents = set()
for doc in filtered:
content_key = doc.page_content[:100] # 앞 100자로 근사 비교
if content_key not in seen_contents:
deduplicated.append(doc)
seen_contents.add(content_key)
# 3. 토큰 수 제한
total_tokens = 0
final_results = []
for doc in deduplicated:
doc_tokens = len(doc.page_content) // 4 # 대략적 토큰 추정
if total_tokens + doc_tokens > max_tokens:
break
final_results.append(doc)
total_tokens += doc_tokens
return final_results이번 장에서는 문서 로딩, 전처리, 청킹, 임베딩, 벡터 저장으로 이어지는 인덱싱 파이프라인과, 쿼리 변환, 벡터 검색, 컨텍스트 구성, LLM 생성으로 이어지는 검색 파이프라인을 구현했습니다. 쿼리 재작성, 멀티 쿼리, HyDE 등의 쿼리 변환 기법과 증분 인덱싱, 검색 결과 후처리 등 프로덕션에 필요한 요소도 함께 다루었습니다.
다음 장에서는 벡터 검색만으로는 해결하지 못하는 한계를 극복하기 위한 하이브리드 검색에 대해 다룹니다. BM25 키워드 검색과 시맨틱 벡터 검색을 결합하여 검색 품질을 한 단계 높이는 방법을 살펴보겠습니다.
이 글이 도움이 되셨나요?
키워드 기반 BM25와 벡터 기반 시맨틱 검색을 결합한 하이브리드 검색의 원리, 구현 방법, 그리고 Reciprocal Rank Fusion 전략을 다룹니다.
Pinecone, Weaviate, Qdrant, pgvector 등 주요 벡터 데이터베이스의 특성을 비교하고 상황에 맞는 선택 가이드를 제공합니다.
Cross-Encoder 리랭킹의 원리, Cohere Rerank API, 오픈소스 리랭커 비교, 그리고 프로덕션 환경에서의 효과적인 리랭킹 전략을 다룹니다.