순차/병렬 체이닝, 조건부/시맨틱 라우팅, 폴백 체인을 각 프레임워크별로 비교 구현하며 실전 패턴을 정리합니다.
지금까지 개별 프레임워크를 깊이 살펴보았습니다. 이번 장부터는 프레임워크를 관통하는 공통 패턴에 집중합니다. 어떤 프레임워크를 사용하든 반드시 마주하는 두 가지 기본 패턴이 있습니다.
가장 기본적인 패턴입니다. 하나의 단계가 완료되면 그 결과가 다음 단계의 입력으로 전달됩니다.
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
model = ChatOpenAI(model="gpt-4o")
# 1단계: 주제 분석
analyze_prompt = ChatPromptTemplate.from_template(
"다음 텍스트의 주요 주제 3가지를 추출하세요:\n{text}"
)
# 2단계: 각 주제 설명
explain_prompt = ChatPromptTemplate.from_template(
"다음 주제들에 대해 각각 한 문장으로 설명하세요:\n{topics}"
)
# 3단계: 최종 요약
summarize_prompt = ChatPromptTemplate.from_template(
"다음 분석 결과를 하나의 단락으로 요약하세요:\n{explanations}"
)
# LCEL 파이프로 체이닝
chain = (
analyze_prompt | model | StrOutputParser()
| (lambda topics: {"topics": topics})
| explain_prompt | model | StrOutputParser()
| (lambda explanations: {"explanations": explanations})
| summarize_prompt | model | StrOutputParser()
)
result = chain.invoke({"text": "분석할 텍스트..."})from haystack import Pipeline
from haystack.components.generators import OpenAIGenerator
from haystack.components.builders import PromptBuilder
pipeline = Pipeline()
pipeline.add_component(
"analyze",
PromptBuilder(template="주제 3가지를 추출하세요:\n{{ text }}"),
)
pipeline.add_component("analyze_llm", OpenAIGenerator(model="gpt-4o"))
pipeline.add_component(
"explain",
PromptBuilder(template="각 주제를 설명하세요:\n{{ topics }}"),
)
pipeline.add_component("explain_llm", OpenAIGenerator(model="gpt-4o"))
pipeline.add_component(
"summarize",
PromptBuilder(template="요약하세요:\n{{ explanations }}"),
)
pipeline.add_component("summarize_llm", OpenAIGenerator(model="gpt-4o"))
pipeline.connect("analyze", "analyze_llm")
pipeline.connect("analyze_llm.replies", "explain.topics")
pipeline.connect("explain", "explain_llm")
pipeline.connect("explain_llm.replies", "summarize.explanations")
pipeline.connect("summarize", "summarize_llm")from llama_index.core.workflow import Workflow, step, StartEvent, StopEvent, Event
class AnalyzedEvent(Event):
topics: str
class ExplainedEvent(Event):
explanations: str
class SequentialWorkflow(Workflow):
@step
async def analyze(self, ev: StartEvent) -> AnalyzedEvent:
topics = await self.llm.acomplete(
f"주제 3가지를 추출하세요:\n{ev.text}"
)
return AnalyzedEvent(topics=str(topics))
@step
async def explain(self, ev: AnalyzedEvent) -> ExplainedEvent:
explanations = await self.llm.acomplete(
f"각 주제를 설명하세요:\n{ev.topics}"
)
return ExplainedEvent(explanations=str(explanations))
@step
async def summarize(self, ev: ExplainedEvent) -> StopEvent:
summary = await self.llm.acomplete(
f"요약하세요:\n{ev.explanations}"
)
return StopEvent(result=str(summary))독립적인 처리를 동시에 실행하여 전체 지연 시간을 줄이는 패턴입니다.
from langchain_core.runnables import RunnableParallel
# 세 분석을 동시에 실행
parallel_chain = RunnableParallel(
sentiment=sentiment_prompt | model | StrOutputParser(),
entities=entity_prompt | model | StrOutputParser(),
summary=summary_prompt | model | StrOutputParser(),
)
# 결과 합성
combine_prompt = ChatPromptTemplate.from_template("""
감성 분석: {sentiment}
엔티티 추출: {entities}
요약: {summary}
위 분석 결과를 종합 보고서로 작성하세요.
""")
full_chain = parallel_chain | combine_prompt | model | StrOutputParser()
result = full_chain.invoke({"text": "분석할 텍스트..."})from langgraph.graph import StateGraph, START, END
from typing import TypedDict
class AnalysisState(TypedDict):
text: str
sentiment: str
entities: str
summary: str
report: str
graph = StateGraph(AnalysisState)
# 각 분석 노드
graph.add_node("sentiment", analyze_sentiment)
graph.add_node("entities", extract_entities)
graph.add_node("summary", create_summary)
graph.add_node("combine", combine_results)
# 팬아웃: 3개 노드에 동시 전달
graph.add_edge(START, "sentiment")
graph.add_edge(START, "entities")
graph.add_edge(START, "summary")
# 팬인: 3개 결과를 하나로 합침
graph.add_edge("sentiment", "combine")
graph.add_edge("entities", "combine")
graph.add_edge("summary", "combine")
graph.add_edge("combine", END)
app = graph.compile()# Haystack은 독립 컴포넌트를 자동으로 병렬 실행
pipeline = AsyncPipeline()
pipeline.add_component("sentiment_prompt", sentiment_builder)
pipeline.add_component("entity_prompt", entity_builder)
pipeline.add_component("summary_prompt", summary_builder)
pipeline.add_component("sentiment_llm", OpenAIGenerator(model="gpt-4o"))
pipeline.add_component("entity_llm", OpenAIGenerator(model="gpt-4o"))
pipeline.add_component("summary_llm", OpenAIGenerator(model="gpt-4o"))
pipeline.add_component("joiner", ResultJoiner())
# 독립적인 경로 -> AsyncPipeline이 자동 병렬화
pipeline.connect("sentiment_prompt", "sentiment_llm")
pipeline.connect("entity_prompt", "entity_llm")
pipeline.connect("summary_prompt", "summary_llm")
pipeline.connect("sentiment_llm.replies", "joiner.sentiment")
pipeline.connect("entity_llm.replies", "joiner.entities")
pipeline.connect("summary_llm.replies", "joiner.summary")병렬 체이닝의 효과는 각 단계의 실행 시간이 비슷할 때 극대화됩니다. 한 단계가 나머지보다 훨씬 느리면, 전체 지연 시간은 가장 느린 단계에 의해 결정됩니다. 이 경우 느린 단계의 최적화가 병렬화보다 효과적일 수 있습니다.
입력의 특성을 분석하여 적절한 처리 경로를 선택하는 패턴입니다.
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableBranch, RunnableLambda
# 의도 분류기
classifier_prompt = ChatPromptTemplate.from_template("""
다음 질문의 유형을 분류하세요.
가능한 유형: "technical", "creative", "analytical"
질문만 보고 유형 하나만 답하세요.
질문: {question}
유형:""")
classifier = classifier_prompt | model | StrOutputParser()
# 유형별 전문 체인
technical_chain = technical_prompt | model | StrOutputParser()
creative_chain = creative_prompt | model | StrOutputParser()
analytical_chain = analytical_prompt | model | StrOutputParser()
# 라우팅
def route(info: dict) -> str:
intent = info["intent"].strip().lower()
if "technical" in intent:
return technical_chain.invoke(info)
elif "creative" in intent:
return creative_chain.invoke(info)
else:
return analytical_chain.invoke(info)
full_chain = (
RunnableParallel(
intent=classifier,
question=lambda x: x["question"],
)
| RunnableLambda(route)
)from semantic_kernel.functions import kernel_function
class RouterPlugin:
@kernel_function(description="질문 유형에 따라 적절한 전문가를 선택합니다")
async def route_question(
self,
question: str,
kernel: "Kernel",
) -> str:
# LLM으로 의도 분류
intent = await kernel.invoke_prompt(
f"질문 유형 분류 (technical/creative/analytical): {question}"
)
intent_str = str(intent).strip().lower()
if "technical" in intent_str:
result = await kernel.invoke(
plugin_name="Experts",
function_name="technical_expert",
question=question,
)
elif "creative" in intent_str:
result = await kernel.invoke(
plugin_name="Experts",
function_name="creative_expert",
question=question,
)
else:
result = await kernel.invoke(
plugin_name="Experts",
function_name="analytical_expert",
question=question,
)
return str(result)LLM 호출 없이 임베딩 유사도만으로 경로를 결정하는 효율적인 라우팅 방법입니다.
import numpy as np
from langchain_openai import OpenAIEmbeddings
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
# 각 경로를 대표하는 설명을 미리 임베딩
route_descriptions = {
"technical": "기술적인 질문, 코드 관련, 시스템 설계, 아키텍처",
"creative": "창작, 글쓰기, 아이디어 생성, 브레인스토밍",
"analytical": "데이터 분석, 통계, 수치 해석, 비교 분석",
}
# 경로 임베딩 사전 계산
route_embeddings = {}
for route, desc in route_descriptions.items():
route_embeddings[route] = embeddings.embed_query(desc)
def semantic_route(query: str) -> str:
"""임베딩 유사도로 경로 결정"""
query_embedding = embeddings.embed_query(query)
similarities = {}
for route, route_emb in route_embeddings.items():
similarity = np.dot(query_embedding, route_emb) / (
np.linalg.norm(query_embedding) * np.linalg.norm(route_emb)
)
similarities[route] = similarity
return max(similarities, key=similarities.get)
# 사용
route = semantic_route("Python에서 메모리 누수를 디버깅하는 방법")
# -> "technical"시맨틱 라우팅은 LLM 호출이 필요 없으므로 지연 시간과 비용 면에서 훨씬 효율적입니다. 경로 임베딩을 사전 계산해두면 라우팅 자체는 밀리초 단위로 완료됩니다. 다만, 복잡한 의도 분류에는 LLM 기반 라우팅이 더 정확할 수 있습니다.
주 경로가 실패했을 때 대체 경로로 전환하는 안정성 패턴입니다.
from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic
# 주 모델
primary = ChatOpenAI(model="gpt-4o", request_timeout=10)
# 폴백 모델들 (비용이 낮은 순서)
fallback_1 = ChatOpenAI(model="gpt-4o-mini", request_timeout=15)
fallback_2 = ChatAnthropic(model="claude-3-5-haiku-20241022", request_timeout=20)
# 폴백 체인 구성
robust_model = primary.with_fallbacks(
[fallback_1, fallback_2],
exceptions_to_handle=(Exception,),
)
# 재시도 + 폴백 조합
chain = (
prompt
| robust_model.with_retry(stop_after_attempt=2)
| StrOutputParser()
)from typing import Callable, TypeVar
import asyncio
T = TypeVar("T")
async def with_fallback(
primary: Callable[..., T],
fallbacks: list[Callable[..., T]],
*args,
**kwargs,
) -> T:
"""범용 폴백 패턴"""
all_handlers = [primary] + fallbacks
last_error = None
for handler in all_handlers:
try:
return await handler(*args, **kwargs)
except Exception as e:
last_error = e
continue
raise last_error
# 사용
result = await with_fallback(
primary=gpt4_handler,
fallbacks=[gpt4_mini_handler, claude_handler],
query="질문 내용",
)| 패턴 | LangChain | LangGraph | LlamaIndex | Haystack |
|---|---|---|---|---|
| 순차 체이닝 | | 파이프 | 순차 엣지 | 이벤트 체인 | connect() |
| 병렬 체이닝 | RunnableParallel | 팬아웃 엣지 | send_event() | 자동 병렬화 |
| 조건부 라우팅 | RunnableBranch | add_conditional_edges | 이벤트 타입 분기 | ConditionalRouter |
| 폴백 | .with_fallbacks() | 조건부 엣지 | try-except | 커스텀 컴포넌트 |
| 루프 | 지원 안 함 | 사이클 엣지 | 이벤트 재발행 | 컴포넌트 재연결 |
패턴 선택의 핵심 원칙은 "필요한 만큼만"입니다. 단순한 순차 처리에 LangGraph의 StateGraph를 사용할 필요는 없고, 반대로 복잡한 사이클이 필요한 곳에 LCEL만으로 해결하려 하면 코드가 복잡해집니다. 워크플로우의 복잡도에 맞는 도구를 선택하세요.
여러 패턴을 조합한 실전 예제입니다.
from langchain_core.runnables import (
RunnableParallel,
RunnableLambda,
RunnablePassthrough,
)
# 1단계: 입력 전처리 + 의도 분류 (병렬)
preprocessing = RunnableParallel(
cleaned_text=RunnableLambda(clean_text),
intent=classifier_chain,
original=RunnablePassthrough(),
)
# 2단계: 의도 기반 라우팅
def route_by_intent(data: dict):
intent = data["intent"].strip()
text = data["cleaned_text"]
routes = {
"technical": technical_chain,
"creative": creative_chain,
"analytical": analytical_chain,
}
chain = routes.get(intent, analytical_chain)
return chain.invoke({"text": text})
# 3단계: 품질 검증 + 폴백
quality_check = (
quality_prompt
| model.with_fallbacks([fallback_model])
| StrOutputParser()
)
# 전체 파이프라인 조합
full_pipeline = (
preprocessing
| RunnableLambda(route_by_intent)
| (lambda result: {"analysis": result})
| quality_check
)
result = full_pipeline.invoke({"text": "분석할 텍스트..."})8장에서는 LLM 애플리케이션의 또 다른 핵심 요소인 메모리 관리와 상태 유지를 다룹니다. 대화 메모리, 장기 메모리, 벡터 메모리, 구조화된 상태를 각 프레임워크별로 비교하고, 프로덕션 메모리 전략을 정리합니다.
이 글이 도움이 되셨나요?
관련 주제 더 보기
대화 메모리, 장기 메모리, 벡터 메모리, 구조화된 상태를 각 프레임워크별로 비교하고 프로덕션 메모리 전략을 정리합니다.
deepset Haystack 2.x의 컴포넌트와 파이프라인 개념, 방향성 멀티그래프, AsyncPipeline, 라우터, 문서 스토어를 분석합니다.
SSE/WebSocket, 토큰/이벤트 스트리밍, 구조화된 출력 스트리밍을 각 프레임워크별로 비교하고 프론트엔드 통합을 다룹니다.