deepset Haystack 2.x의 컴포넌트와 파이프라인 개념, 방향성 멀티그래프, AsyncPipeline, 라우터, 문서 스토어를 분석합니다.
deepset의 Haystack은 "최소한의 프레임워크 오버헤드로 최대한의 제어를 제공한다"는 철학을 가지고 있습니다. 다른 프레임워크들이 풍부한 추상화와 편의 기능을 강조하는 반면, Haystack은 개발자가 각 단계에서 무슨 일이 일어나는지 명확하게 이해하고 제어할 수 있도록 설계되었습니다.
이 접근 방식의 결과는 수치로 확인됩니다.
Haystack 2.x는 1.x에서의 완전한 재설계를 거쳤습니다. 1.x의 모놀리식 구조를 버리고, 조합 가능한 컴포넌트와 유연한 파이프라인 구조를 도입했습니다.
Haystack의 모든 처리 단위는 컴포넌트(Component)입니다. 컴포넌트는 입력과 출력 소켓이 명시적으로 정의된 함수 단위입니다.
from haystack import component
@component
class TextCleaner:
"""텍스트 전처리 컴포넌트"""
@component.output_types(cleaned_text=str)
def run(self, text: str) -> dict:
cleaned = text.strip().lower()
cleaned = " ".join(cleaned.split()) # 중복 공백 제거
return {"cleaned_text": cleaned}@component 데코레이터가 클래스를 파이프라인 호환 컴포넌트로 변환합니다. @component.output_types로 출력 타입을 명시적으로 선언하여, 파이프라인 검증 시 타입 불일치를 사전에 감지합니다.
Haystack은 다양한 기본 컴포넌트를 제공합니다.
| 카테고리 | 컴포넌트 | 역할 |
|---|---|---|
| Generators | OpenAIGenerator, AnthropicGenerator | LLM 텍스트 생성 |
| Retrievers | InMemoryBM25Retriever, PineconeRetriever | 문서 검색 |
| Routers | ConditionalRouter, MetadataRouter | 조건부 라우팅 |
| Converters | HTMLToDocument, PyPDFToDocument | 파일 변환 |
| Preprocessors | DocumentCleaner, DocumentSplitter | 문서 전처리 |
| Writers | DocumentWriter | 문서 스토어에 저장 |
| Evaluators | FaithfulnessEvaluator, AnswerExactMatch | 품질 평가 |
Haystack의 파이프라인은 방향성 멀티그래프(Directed Multigraph)로 구현됩니다. 두 컴포넌트 사이에 여러 개의 연결이 가능하며, 각 연결은 명시적으로 소켓 이름이 지정됩니다.
from haystack import Pipeline
from haystack.components.generators import OpenAIGenerator
from haystack.components.builders import PromptBuilder
# 컴포넌트 생성
prompt_builder = PromptBuilder(
template="""
다음 컨텍스트를 바탕으로 질문에 답변해주세요.
컨텍스트:
{% for doc in documents %}
{{ doc.content }}
{% endfor %}
질문: {{ question }}
답변:
"""
)
generator = OpenAIGenerator(model="gpt-4o")
# 파이프라인 구성
pipeline = Pipeline()
pipeline.add_component("prompt_builder", prompt_builder)
pipeline.add_component("generator", generator)
# 연결: prompt_builder의 출력 -> generator의 입력
pipeline.connect("prompt_builder", "generator")
# 실행
result = pipeline.run(
{
"prompt_builder": {
"documents": documents,
"question": "Haystack의 핵심 개념은?",
}
}
)
print(result["generator"]["replies"][0])from haystack import Pipeline
from haystack.components.retrievers.in_memory import (
InMemoryBM25Retriever,
)
from haystack.components.generators import OpenAIGenerator
from haystack.components.builders import PromptBuilder
from haystack.document_stores.in_memory import InMemoryDocumentStore
# 문서 스토어 설정
document_store = InMemoryDocumentStore()
# 파이프라인 구성
rag_pipeline = Pipeline()
rag_pipeline.add_component(
"retriever",
InMemoryBM25Retriever(document_store=document_store),
)
rag_pipeline.add_component(
"prompt_builder",
PromptBuilder(template=rag_template),
)
rag_pipeline.add_component(
"generator",
OpenAIGenerator(model="gpt-4o"),
)
# 연결
rag_pipeline.connect("retriever.documents", "prompt_builder.documents")
rag_pipeline.connect("prompt_builder", "generator")
# 실행
result = rag_pipeline.run(
{
"retriever": {"query": "Haystack 파이프라인이란?"},
"prompt_builder": {"question": "Haystack 파이프라인이란?"},
}
)Haystack 파이프라인의 연결은 소켓 단위로 이루어집니다. "retriever.documents"처럼 컴포넌트명.소켓명 형식으로 정확한 입출력을 지정합니다. 이를 통해 어떤 데이터가 어디로 흐르는지 명확하게 파악할 수 있습니다.
Haystack 2.x는 AsyncPipeline을 통해 독립적인 컴포넌트를 병렬로 실행합니다.
from haystack import AsyncPipeline
# AsyncPipeline으로 전환
async_pipeline = AsyncPipeline()
async_pipeline.add_component("bm25_retriever", bm25_retriever)
async_pipeline.add_component("embedding_retriever", embedding_retriever)
async_pipeline.add_component("joiner", DocumentJoiner())
async_pipeline.add_component("prompt_builder", prompt_builder)
async_pipeline.add_component("generator", generator)
# bm25와 embedding 검색은 독립적 -> 병렬 실행
async_pipeline.connect("bm25_retriever.documents", "joiner.documents")
async_pipeline.connect("embedding_retriever.documents", "joiner.documents")
async_pipeline.connect("joiner.documents", "prompt_builder.documents")
async_pipeline.connect("prompt_builder", "generator")
# 비동기 실행
result = await async_pipeline.run_async(
{
"bm25_retriever": {"query": "검색어"},
"embedding_retriever": {"query": "검색어"},
"prompt_builder": {"question": "검색어"},
}
)BM25 검색과 임베딩 검색이 서로 의존하지 않으므로 동시에 실행됩니다. Joiner 컴포넌트가 두 결과를 합친 후 다음 단계로 전달합니다.
조건에 따라 파이프라인의 흐름을 다르게 분기합니다.
from haystack.components.routers import ConditionalRouter
routes = [
{
"condition": "{{ query|length > 100 }}",
"output": "{{ query }}",
"output_name": "long_query",
"output_type": str,
},
{
"condition": "{{ query|length <= 100 }}",
"output": "{{ query }}",
"output_name": "short_query",
"output_type": str,
},
]
router = ConditionalRouter(routes=routes)
pipeline = Pipeline()
pipeline.add_component("router", router)
pipeline.add_component("detailed_retriever", detailed_retriever)
pipeline.add_component("simple_retriever", simple_retriever)
pipeline.connect("router.long_query", "detailed_retriever.query")
pipeline.connect("router.short_query", "simple_retriever.query")응답 품질이 기준에 미달할 경우 재시도하는 루프를 구현합니다.
@component
class QualityChecker:
"""응답 품질을 검증하는 컴포넌트"""
@component.output_types(
approved=str,
rejected=str,
)
def run(self, reply: str) -> dict:
# 품질 검증 로직
if self.is_quality_sufficient(reply):
return {"approved": reply}
return {"rejected": reply}
def is_quality_sufficient(self, reply: str) -> bool:
return len(reply) > 100 and "?" not in reply
pipeline = Pipeline()
pipeline.add_component("generator", generator)
pipeline.add_component("checker", QualityChecker())
pipeline.add_component("retry_prompt", retry_prompt_builder)
pipeline.connect("generator.replies", "checker.reply")
pipeline.connect("checker.rejected", "retry_prompt.previous_reply")
pipeline.connect("retry_prompt", "generator") # 루프
# checker.approved는 파이프라인의 최종 출력루프 패턴을 사용할 때는 반드시 최대 반복 횟수를 설정하세요. Haystack 파이프라인은 max_runs_per_component 파라미터로 무한 루프를 방지합니다. 기본값은 100이지만, 프로덕션에서는 5~10 정도로 제한하는 것이 안전합니다.
Haystack은 다양한 문서 스토어(Document Store)를 지원합니다. 문서 스토어는 문서의 저장, 검색, 필터링을 담당합니다.
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack import Document
# 인메모리 스토어 (개발/테스트용)
store = InMemoryDocumentStore()
# 문서 추가
documents = [
Document(
content="Haystack은 오픈소스 AI 프레임워크입니다.",
meta={"source": "docs", "language": "ko"},
),
Document(
content="파이프라인은 컴포넌트의 조합입니다.",
meta={"source": "tutorial", "language": "ko"},
),
]
store.write_documents(documents)
# 필터링 검색
from haystack.document_stores.types import FilterPolicy
results = store.filter_documents(
filters={
"operator": "AND",
"conditions": [
{"field": "meta.source", "operator": "==", "value": "docs"},
{"field": "meta.language", "operator": "==", "value": "ko"},
],
}
)# Pinecone
from haystack_integrations.document_stores.pinecone import (
PineconeDocumentStore,
)
pinecone_store = PineconeDocumentStore(
index="my-index",
namespace="production",
)
# Weaviate
from haystack_integrations.document_stores.weaviate import (
WeaviateDocumentStore,
)
weaviate_store = WeaviateDocumentStore(url="http://localhost:8080")
# Qdrant
from haystack_integrations.document_stores.qdrant import (
QdrantDocumentStore,
)
qdrant_store = QdrantDocumentStore(
url="http://localhost:6333",
index="my-collection",
)문서를 전처리하고 인덱싱하는 인제스천(Ingestion) 파이프라인도 동일한 파이프라인 구조로 구성합니다.
from haystack import Pipeline
from haystack.components.converters import PyPDFToDocument
from haystack.components.preprocessors import (
DocumentCleaner,
DocumentSplitter,
)
from haystack.components.embedders import (
OpenAIDocumentEmbedder,
)
from haystack.components.writers import DocumentWriter
# 인제스천 파이프라인
ingestion = Pipeline()
ingestion.add_component("converter", PyPDFToDocument())
ingestion.add_component("cleaner", DocumentCleaner())
ingestion.add_component(
"splitter",
DocumentSplitter(split_by="sentence", split_length=5),
)
ingestion.add_component(
"embedder",
OpenAIDocumentEmbedder(model="text-embedding-3-small"),
)
ingestion.add_component(
"writer",
DocumentWriter(document_store=document_store),
)
ingestion.connect("converter", "cleaner")
ingestion.connect("cleaner", "splitter")
ingestion.connect("splitter", "embedder")
ingestion.connect("embedder", "writer")
# PDF 파일 처리
ingestion.run(
{"converter": {"sources": ["./report.pdf", "./guide.pdf"]}}
)Haystack 파이프라인은 YAML로 직렬화하여 저장하고 공유할 수 있습니다.
# 파이프라인을 YAML로 저장
with open("rag_pipeline.yaml", "w") as f:
pipeline.dump(f)
# YAML에서 파이프라인 로드
with open("rag_pipeline.yaml", "r") as f:
loaded_pipeline = Pipeline.load(f)
result = loaded_pipeline.run(...)YAML 직렬화는 파이프라인 버전 관리, 팀 간 공유, CI/CD 파이프라인에서의 자동 배포에 유용합니다. 파이프라인 정의를 코드와 분리하여 설정 파일로 관리할 수 있습니다.
| 상황 | 이유 |
|---|---|
| 지연 시간에 민감한 서비스 | 5.9ms의 최소 오버헤드 |
| 명시적 제어가 필요한 경우 | 모든 데이터 흐름이 명시적으로 선언됨 |
| 검색/NLP 파이프라인 중심 | deepset의 NLP 전문성이 반영된 설계 |
| 토큰 비용 최적화 | 1.57k의 최소 토큰 사용량 |
| 경량 프레임워크 선호 | 과도한 추상화 없이 필요한 것만 제공 |
7장에서는 프레임워크를 넘어서는 공통 패턴인 체이닝과 라우팅을 다룹니다. 순차/병렬 체이닝, 조건부/시맨틱 라우팅, 폴백 체인을 각 프레임워크별로 비교 구현하며, 실전에서 자주 사용되는 패턴을 정리합니다.
이 글이 도움이 되셨나요?
관련 주제 더 보기
순차/병렬 체이닝, 조건부/시맨틱 라우팅, 폴백 체인을 각 프레임워크별로 비교 구현하며 실전 패턴을 정리합니다.
Microsoft Semantic Kernel의 멀티 언어 아키텍처, 플러그인 시스템, 플래너, Azure 통합, 엔터프라이즈 보안과 거버넌스를 분석합니다.
대화 메모리, 장기 메모리, 벡터 메모리, 구조화된 상태를 각 프레임워크별로 비교하고 프로덕션 메모리 전략을 정리합니다.