본문으로 건너뛰기
Kreath Archive
TechProjectsBooksAbout
TechProjectsBooksAbout

내비게이션

  • Tech
  • Projects
  • Books
  • About
  • Tags

카테고리

  • AI / ML
  • 웹 개발
  • 프로그래밍
  • 개발 도구

연결

  • GitHub
  • Email
  • RSS
© 2026 Kreath Archive. All rights reserved.Built with Next.js + MDX
홈TechProjectsBooksAbout
//
  1. 홈
  2. 테크
  3. 6장: Haystack -- 모듈러 파이프라인 아키텍처
2026년 2월 12일·AI / ML·

6장: Haystack -- 모듈러 파이프라인 아키텍처

deepset Haystack 2.x의 컴포넌트와 파이프라인 개념, 방향성 멀티그래프, AsyncPipeline, 라우터, 문서 스토어를 분석합니다.

14분814자12개 섹션
orchestrationai-frameworkaillm
공유
ai-orchestration6 / 11
1234567891011
이전5장: Semantic Kernel -- 엔터프라이즈 AI 오케스트레이션다음7장: 체이닝과 라우팅 패턴

이 장에서 배우는 것

  • Haystack 2.x의 설계 철학과 "명시적 제어" 원칙
  • 컴포넌트(Component)와 파이프라인(Pipeline)의 핵심 개념
  • 방향성 멀티그래프(Directed Multigraph) 구조의 특성
  • AsyncPipeline을 통한 병렬 실행
  • 라우터, 루프, 브랜치 패턴
  • 문서 스토어와 리트리버 통합

최소주의 설계 철학

deepset의 Haystack은 "최소한의 프레임워크 오버헤드로 최대한의 제어를 제공한다"는 철학을 가지고 있습니다. 다른 프레임워크들이 풍부한 추상화와 편의 기능을 강조하는 반면, Haystack은 개발자가 각 단계에서 무슨 일이 일어나는지 명확하게 이해하고 제어할 수 있도록 설계되었습니다.

이 접근 방식의 결과는 수치로 확인됩니다.

  • 프레임워크 오버헤드: 약 5.9ms (주요 프레임워크 중 최소)
  • 토큰 사용량: 약 1.57k (주요 프레임워크 중 최소)

Haystack 2.x는 1.x에서의 완전한 재설계를 거쳤습니다. 1.x의 모놀리식 구조를 버리고, 조합 가능한 컴포넌트와 유연한 파이프라인 구조를 도입했습니다.


컴포넌트: 파이프라인의 빌딩 블록

Haystack의 모든 처리 단위는 컴포넌트(Component)입니다. 컴포넌트는 입력과 출력 소켓이 명시적으로 정의된 함수 단위입니다.

컴포넌트 인터페이스

component_interface.py
python
from haystack import component
 
@component
class TextCleaner:
    """텍스트 전처리 컴포넌트"""
 
    @component.output_types(cleaned_text=str)
    def run(self, text: str) -> dict:
        cleaned = text.strip().lower()
        cleaned = " ".join(cleaned.split())  # 중복 공백 제거
        return {"cleaned_text": cleaned}

@component 데코레이터가 클래스를 파이프라인 호환 컴포넌트로 변환합니다. @component.output_types로 출력 타입을 명시적으로 선언하여, 파이프라인 검증 시 타입 불일치를 사전에 감지합니다.

기본 제공 컴포넌트

Haystack은 다양한 기본 컴포넌트를 제공합니다.

카테고리컴포넌트역할
GeneratorsOpenAIGenerator, AnthropicGeneratorLLM 텍스트 생성
RetrieversInMemoryBM25Retriever, PineconeRetriever문서 검색
RoutersConditionalRouter, MetadataRouter조건부 라우팅
ConvertersHTMLToDocument, PyPDFToDocument파일 변환
PreprocessorsDocumentCleaner, DocumentSplitter문서 전처리
WritersDocumentWriter문서 스토어에 저장
EvaluatorsFaithfulnessEvaluator, AnswerExactMatch품질 평가

파이프라인: 방향성 멀티그래프

Haystack의 파이프라인은 방향성 멀티그래프(Directed Multigraph)로 구현됩니다. 두 컴포넌트 사이에 여러 개의 연결이 가능하며, 각 연결은 명시적으로 소켓 이름이 지정됩니다.

기본 파이프라인

basic_pipeline.py
python
from haystack import Pipeline
from haystack.components.generators import OpenAIGenerator
from haystack.components.builders import PromptBuilder
 
# 컴포넌트 생성
prompt_builder = PromptBuilder(
    template="""
다음 컨텍스트를 바탕으로 질문에 답변해주세요.
 
컨텍스트:
{% for doc in documents %}
{{ doc.content }}
{% endfor %}
 
질문: {{ question }}
답변:
"""
)
generator = OpenAIGenerator(model="gpt-4o")
 
# 파이프라인 구성
pipeline = Pipeline()
pipeline.add_component("prompt_builder", prompt_builder)
pipeline.add_component("generator", generator)
 
# 연결: prompt_builder의 출력 -> generator의 입력
pipeline.connect("prompt_builder", "generator")
 
# 실행
result = pipeline.run(
    {
        "prompt_builder": {
            "documents": documents,
            "question": "Haystack의 핵심 개념은?",
        }
    }
)
print(result["generator"]["replies"][0])

RAG 파이프라인

rag_pipeline.py
python
from haystack import Pipeline
from haystack.components.retrievers.in_memory import (
    InMemoryBM25Retriever,
)
from haystack.components.generators import OpenAIGenerator
from haystack.components.builders import PromptBuilder
from haystack.document_stores.in_memory import InMemoryDocumentStore
 
# 문서 스토어 설정
document_store = InMemoryDocumentStore()
 
# 파이프라인 구성
rag_pipeline = Pipeline()
rag_pipeline.add_component(
    "retriever",
    InMemoryBM25Retriever(document_store=document_store),
)
rag_pipeline.add_component(
    "prompt_builder",
    PromptBuilder(template=rag_template),
)
rag_pipeline.add_component(
    "generator",
    OpenAIGenerator(model="gpt-4o"),
)
 
# 연결
rag_pipeline.connect("retriever.documents", "prompt_builder.documents")
rag_pipeline.connect("prompt_builder", "generator")
 
# 실행
result = rag_pipeline.run(
    {
        "retriever": {"query": "Haystack 파이프라인이란?"},
        "prompt_builder": {"question": "Haystack 파이프라인이란?"},
    }
)
Info

Haystack 파이프라인의 연결은 소켓 단위로 이루어집니다. "retriever.documents"처럼 컴포넌트명.소켓명 형식으로 정확한 입출력을 지정합니다. 이를 통해 어떤 데이터가 어디로 흐르는지 명확하게 파악할 수 있습니다.


AsyncPipeline: 비동기 병렬 실행

Haystack 2.x는 AsyncPipeline을 통해 독립적인 컴포넌트를 병렬로 실행합니다.

async_pipeline.py
python
from haystack import AsyncPipeline
 
# AsyncPipeline으로 전환
async_pipeline = AsyncPipeline()
async_pipeline.add_component("bm25_retriever", bm25_retriever)
async_pipeline.add_component("embedding_retriever", embedding_retriever)
async_pipeline.add_component("joiner", DocumentJoiner())
async_pipeline.add_component("prompt_builder", prompt_builder)
async_pipeline.add_component("generator", generator)
 
# bm25와 embedding 검색은 독립적 -> 병렬 실행
async_pipeline.connect("bm25_retriever.documents", "joiner.documents")
async_pipeline.connect("embedding_retriever.documents", "joiner.documents")
async_pipeline.connect("joiner.documents", "prompt_builder.documents")
async_pipeline.connect("prompt_builder", "generator")
 
# 비동기 실행
result = await async_pipeline.run_async(
    {
        "bm25_retriever": {"query": "검색어"},
        "embedding_retriever": {"query": "검색어"},
        "prompt_builder": {"question": "검색어"},
    }
)

BM25 검색과 임베딩 검색이 서로 의존하지 않으므로 동시에 실행됩니다. Joiner 컴포넌트가 두 결과를 합친 후 다음 단계로 전달합니다.


라우터와 분기

ConditionalRouter

조건에 따라 파이프라인의 흐름을 다르게 분기합니다.

conditional_router.py
python
from haystack.components.routers import ConditionalRouter
 
routes = [
    {
        "condition": "{{ query|length > 100 }}",
        "output": "{{ query }}",
        "output_name": "long_query",
        "output_type": str,
    },
    {
        "condition": "{{ query|length <= 100 }}",
        "output": "{{ query }}",
        "output_name": "short_query",
        "output_type": str,
    },
]
 
router = ConditionalRouter(routes=routes)
 
pipeline = Pipeline()
pipeline.add_component("router", router)
pipeline.add_component("detailed_retriever", detailed_retriever)
pipeline.add_component("simple_retriever", simple_retriever)
 
pipeline.connect("router.long_query", "detailed_retriever.query")
pipeline.connect("router.short_query", "simple_retriever.query")

루프 패턴

응답 품질이 기준에 미달할 경우 재시도하는 루프를 구현합니다.

loop_pattern.py
python
@component
class QualityChecker:
    """응답 품질을 검증하는 컴포넌트"""
 
    @component.output_types(
        approved=str,
        rejected=str,
    )
    def run(self, reply: str) -> dict:
        # 품질 검증 로직
        if self.is_quality_sufficient(reply):
            return {"approved": reply}
        return {"rejected": reply}
 
    def is_quality_sufficient(self, reply: str) -> bool:
        return len(reply) > 100 and "?" not in reply
 
pipeline = Pipeline()
pipeline.add_component("generator", generator)
pipeline.add_component("checker", QualityChecker())
pipeline.add_component("retry_prompt", retry_prompt_builder)
 
pipeline.connect("generator.replies", "checker.reply")
pipeline.connect("checker.rejected", "retry_prompt.previous_reply")
pipeline.connect("retry_prompt", "generator")  # 루프
# checker.approved는 파이프라인의 최종 출력
Warning

루프 패턴을 사용할 때는 반드시 최대 반복 횟수를 설정하세요. Haystack 파이프라인은 max_runs_per_component 파라미터로 무한 루프를 방지합니다. 기본값은 100이지만, 프로덕션에서는 5~10 정도로 제한하는 것이 안전합니다.


문서 스토어

Haystack은 다양한 문서 스토어(Document Store)를 지원합니다. 문서 스토어는 문서의 저장, 검색, 필터링을 담당합니다.

document_store.py
python
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack import Document
 
# 인메모리 스토어 (개발/테스트용)
store = InMemoryDocumentStore()
 
# 문서 추가
documents = [
    Document(
        content="Haystack은 오픈소스 AI 프레임워크입니다.",
        meta={"source": "docs", "language": "ko"},
    ),
    Document(
        content="파이프라인은 컴포넌트의 조합입니다.",
        meta={"source": "tutorial", "language": "ko"},
    ),
]
store.write_documents(documents)
 
# 필터링 검색
from haystack.document_stores.types import FilterPolicy
 
results = store.filter_documents(
    filters={
        "operator": "AND",
        "conditions": [
            {"field": "meta.source", "operator": "==", "value": "docs"},
            {"field": "meta.language", "operator": "==", "value": "ko"},
        ],
    }
)

외부 벡터 스토어 연동

external_store.py
python
# Pinecone
from haystack_integrations.document_stores.pinecone import (
    PineconeDocumentStore,
)
 
pinecone_store = PineconeDocumentStore(
    index="my-index",
    namespace="production",
)
 
# Weaviate
from haystack_integrations.document_stores.weaviate import (
    WeaviateDocumentStore,
)
 
weaviate_store = WeaviateDocumentStore(url="http://localhost:8080")
 
# Qdrant
from haystack_integrations.document_stores.qdrant import (
    QdrantDocumentStore,
)
 
qdrant_store = QdrantDocumentStore(
    url="http://localhost:6333",
    index="my-collection",
)

인제스천 파이프라인

문서를 전처리하고 인덱싱하는 인제스천(Ingestion) 파이프라인도 동일한 파이프라인 구조로 구성합니다.

ingestion_pipeline.py
python
from haystack import Pipeline
from haystack.components.converters import PyPDFToDocument
from haystack.components.preprocessors import (
    DocumentCleaner,
    DocumentSplitter,
)
from haystack.components.embedders import (
    OpenAIDocumentEmbedder,
)
from haystack.components.writers import DocumentWriter
 
# 인제스천 파이프라인
ingestion = Pipeline()
ingestion.add_component("converter", PyPDFToDocument())
ingestion.add_component("cleaner", DocumentCleaner())
ingestion.add_component(
    "splitter",
    DocumentSplitter(split_by="sentence", split_length=5),
)
ingestion.add_component(
    "embedder",
    OpenAIDocumentEmbedder(model="text-embedding-3-small"),
)
ingestion.add_component(
    "writer",
    DocumentWriter(document_store=document_store),
)
 
ingestion.connect("converter", "cleaner")
ingestion.connect("cleaner", "splitter")
ingestion.connect("splitter", "embedder")
ingestion.connect("embedder", "writer")
 
# PDF 파일 처리
ingestion.run(
    {"converter": {"sources": ["./report.pdf", "./guide.pdf"]}}
)

파이프라인 직렬화

Haystack 파이프라인은 YAML로 직렬화하여 저장하고 공유할 수 있습니다.

serialization.py
python
# 파이프라인을 YAML로 저장
with open("rag_pipeline.yaml", "w") as f:
    pipeline.dump(f)
 
# YAML에서 파이프라인 로드
with open("rag_pipeline.yaml", "r") as f:
    loaded_pipeline = Pipeline.load(f)
 
result = loaded_pipeline.run(...)
Tip

YAML 직렬화는 파이프라인 버전 관리, 팀 간 공유, CI/CD 파이프라인에서의 자동 배포에 유용합니다. 파이프라인 정의를 코드와 분리하여 설정 파일로 관리할 수 있습니다.


Haystack이 적합한 경우

상황이유
지연 시간에 민감한 서비스5.9ms의 최소 오버헤드
명시적 제어가 필요한 경우모든 데이터 흐름이 명시적으로 선언됨
검색/NLP 파이프라인 중심deepset의 NLP 전문성이 반영된 설계
토큰 비용 최적화1.57k의 최소 토큰 사용량
경량 프레임워크 선호과도한 추상화 없이 필요한 것만 제공

핵심 요약

  • Haystack 2.x는 명시적 제어를 강조하는 모듈러 파이프라인 프레임워크입니다.
  • 컴포넌트는 타입이 지정된 입출력 소켓을 가지며, 파이프라인은 방향성 멀티그래프로 구성됩니다.
  • AsyncPipeline으로 독립적인 컴포넌트를 자동으로 병렬 실행합니다.
  • ConditionalRouter와 루프 패턴으로 복잡한 분기와 재시도 로직을 구현합니다.
  • 프레임워크 오버헤드 5.9ms, 토큰 사용량 1.57k로 주요 프레임워크 중 가장 효율적입니다.
  • YAML 직렬화로 파이프라인을 코드와 분리하여 관리할 수 있습니다.

다음 장 예고

7장에서는 프레임워크를 넘어서는 공통 패턴인 체이닝과 라우팅을 다룹니다. 순차/병렬 체이닝, 조건부/시맨틱 라우팅, 폴백 체인을 각 프레임워크별로 비교 구현하며, 실전에서 자주 사용되는 패턴을 정리합니다.

이 글이 도움이 되셨나요?

관련 주제 더 보기

#orchestration#ai-framework#ai#llm

관련 글

AI / ML

7장: 체이닝과 라우팅 패턴

순차/병렬 체이닝, 조건부/시맨틱 라우팅, 폴백 체인을 각 프레임워크별로 비교 구현하며 실전 패턴을 정리합니다.

2026년 2월 14일·15분
AI / ML

5장: Semantic Kernel -- 엔터프라이즈 AI 오케스트레이션

Microsoft Semantic Kernel의 멀티 언어 아키텍처, 플러그인 시스템, 플래너, Azure 통합, 엔터프라이즈 보안과 거버넌스를 분석합니다.

2026년 2월 10일·14분
AI / ML

8장: 메모리 관리와 상태 유지

대화 메모리, 장기 메모리, 벡터 메모리, 구조화된 상태를 각 프레임워크별로 비교하고 프로덕션 메모리 전략을 정리합니다.

2026년 2월 16일·16분
이전 글5장: Semantic Kernel -- 엔터프라이즈 AI 오케스트레이션
다음 글7장: 체이닝과 라우팅 패턴

댓글

목차

약 14분 남음
  • 이 장에서 배우는 것
  • 최소주의 설계 철학
  • 컴포넌트: 파이프라인의 빌딩 블록
    • 컴포넌트 인터페이스
    • 기본 제공 컴포넌트
  • 파이프라인: 방향성 멀티그래프
    • 기본 파이프라인
    • RAG 파이프라인
  • AsyncPipeline: 비동기 병렬 실행
  • 라우터와 분기
    • ConditionalRouter
    • 루프 패턴
  • 문서 스토어
    • 외부 벡터 스토어 연동
  • 인제스천 파이프라인
  • 파이프라인 직렬화
  • Haystack이 적합한 경우
  • 핵심 요약
  • 다음 장 예고