SSE/WebSocket, 토큰/이벤트 스트리밍, 구조화된 출력 스트리밍을 각 프레임워크별로 비교하고 프론트엔드 통합을 다룹니다.
GPT-4o 수준의 모델이 복잡한 질문에 답변하는 데 걸리는 시간은 5~15초입니다. 아무런 피드백 없이 15초를 기다리게 하면 대부분의 사용자는 이탈합니다. 스트리밍은 이 문제를 해결하는 핵심 기술입니다.
스트리밍의 효과는 심리적입니다. 실제 완료 시간이 동일하더라도, 토큰이 하나씩 나타나는 것을 보는 사용자는 응답이 더 빠르다고 느낍니다. ChatGPT가 처음부터 스트리밍을 채택한 것도 이 때문입니다.
서버에서 클라이언트로의 단방향 스트리밍에 적합합니다.
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from langchain_openai import ChatOpenAI
app = FastAPI()
@app.get("/chat/stream")
async def stream_chat(question: str):
model = ChatOpenAI(model="gpt-4o", streaming=True)
async def event_generator():
async for chunk in model.astream(question):
if chunk.content:
yield f"data: {chunk.content}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
},
)양방향 통신이 필요한 경우에 사용합니다.
from fastapi import FastAPI, WebSocket
app = FastAPI()
@app.websocket("/ws/chat")
async def websocket_chat(websocket: WebSocket):
await websocket.accept()
while True:
data = await websocket.receive_json()
question = data["question"]
model = ChatOpenAI(model="gpt-4o", streaming=True)
async for chunk in model.astream(question):
if chunk.content:
await websocket.send_json({
"type": "token",
"content": chunk.content,
})
await websocket.send_json({"type": "done"})| 기준 | SSE | WebSocket |
|---|---|---|
| 통신 방향 | 단방향 (서버 -> 클라이언트) | 양방향 |
| 구현 복잡도 | 낮음 | 높음 |
| 재연결 | 자동 지원 | 수동 구현 필요 |
| HTTP 호환성 | HTTP/2 호환 | 별도 프로토콜 |
| 적합한 경우 | 단순 토큰 스트리밍 | 실시간 대화, 동시 편집 |
대부분의 LLM 챗봇 인터페이스에는 SSE가 충분합니다. WebSocket은 사용자가 스트리밍 중에 추가 입력을 보내거나, 서버가 이벤트를 능동적으로 푸시해야 하는 경우에 사용하세요.
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
model = ChatOpenAI(model="gpt-4o", streaming=True)
prompt = ChatPromptTemplate.from_template("{question}")
chain = prompt | model
# 동기 스트리밍
for chunk in chain.stream({"question": "Python의 장점은?"}):
print(chunk.content, end="", flush=True)
# 비동기 스트리밍
async for chunk in chain.astream({"question": "Python의 장점은?"}):
print(chunk.content, end="", flush=True)config = {"configurable": {"thread_id": "user-123"}}
# 노드별 스트리밍
async for event in app.astream(
{"messages": [HumanMessage("분석해줘")]},
config=config,
stream_mode="values",
):
# 각 노드 완료 시 전체 상태 수신
messages = event.get("messages", [])
if messages:
print(f"Latest: {messages[-1].content}")
# 토큰 단위 스트리밍
async for event in app.astream(
{"messages": [HumanMessage("분석해줘")]},
config=config,
stream_mode="messages",
):
# 개별 토큰 수신
message, metadata = event
if message.content:
print(message.content, end="", flush=True)from llama_index.core import VectorStoreIndex
index = VectorStoreIndex.from_documents(documents)
query_engine = index.as_query_engine(streaming=True)
# 스트리밍 응답
streaming_response = query_engine.query("LlamaIndex의 특징은?")
for token in streaming_response.response_gen:
print(token, end="", flush=True)from haystack.components.generators import OpenAIGenerator
generator = OpenAIGenerator(
model="gpt-4o",
streaming_callback=lambda chunk: print(chunk.content, end=""),
)
# 스트리밍 콜백 방식
result = generator.run(prompt="Haystack의 특징은?")토큰 스트리밍보다 풍부한 정보를 제공하는 이벤트 스트리밍입니다. 어떤 노드가 실행 중인지, 도구가 호출되었는지 등의 메타 정보를 포함합니다.
async for event in chain.astream_events(
{"question": "Python의 GIL에 대해 설명해줘"},
version="v2",
):
kind = event["event"]
name = event["name"]
if kind == "on_chat_model_start":
print(f"\n--- 모델 호출 시작: {name} ---")
elif kind == "on_chat_model_stream":
content = event["data"]["chunk"].content
if content:
print(content, end="", flush=True)
elif kind == "on_chat_model_end":
usage = event["data"]["output"].usage_metadata
print(f"\n--- 모델 호출 완료 ---")
print(f"토큰 사용: 입력 {usage['input_tokens']}, "
f"출력 {usage['output_tokens']}")
elif kind == "on_tool_start":
print(f"\n[도구 호출] {event['data']['input']}")
elif kind == "on_tool_end":
print(f"[도구 결과] {event['data']['output']}")async for event in app.astream_events(
{"messages": [HumanMessage("날씨를 확인하고 옷차림을 추천해줘")]},
config=config,
version="v2",
):
if event["event"] == "on_chain_start":
if event["metadata"].get("langgraph_node"):
node = event["metadata"]["langgraph_node"]
print(f"\n=== 노드 시작: {node} ===")
elif event["event"] == "on_chat_model_stream":
content = event["data"]["chunk"].content
if content:
print(content, end="", flush=True)JSON이나 Pydantic 모델 같은 구조화된 출력을 스트리밍하는 것은 독특한 도전입니다. 부분적인 JSON은 파싱할 수 없기 때문입니다.
from langchain_openai import ChatOpenAI
from pydantic import BaseModel
class AnalysisResult(BaseModel):
summary: str
key_points: list[str]
sentiment: str
confidence: float
model = ChatOpenAI(model="gpt-4o")
structured_model = model.with_structured_output(
AnalysisResult,
method="json_schema",
)
# 구조화된 출력 스트리밍
# 각 청크가 부분적으로 채워진 Pydantic 모델로 전달됨
async for partial in structured_model.astream(
"이 기사를 분석해주세요: ..."
):
if partial.summary:
print(f"요약 (진행 중): {partial.summary}")
if partial.key_points:
print(f"핵심 포인트: {len(partial.key_points)}개 발견")구조화된 출력 스트리밍에서 각 청크의 부분 객체는 불완전할 수 있습니다. 예를 들어, key_points 리스트의 마지막 항목이 중간에 잘려 있을 수 있습니다. UI에서는 이를 고려하여 "생성 중..." 표시를 적절히 활용해야 합니다.
import { useState, useCallback } from 'react'
interface StreamState {
content: string
isStreaming: boolean
error: string | null
}
function useStreamChat() {
const [state, setState] = useState<StreamState>({
content: '',
isStreaming: false,
error: null,
})
const sendMessage = useCallback(async (question: string) => {
setState({ content: '', isStreaming: true, error: null })
try {
const response = await fetch(
`/api/chat/stream?question=${encodeURIComponent(question)}`
)
const reader = response.body?.getReader()
const decoder = new TextDecoder()
if (!reader) throw new Error('Stream not available')
while (true) {
const { done, value } = await reader.read()
if (done) break
const text = decoder.decode(value)
const lines = text.split('\n')
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6)
if (data === '[DONE]') break
setState(prev => ({
...prev,
content: prev.content + data,
}))
}
}
}
} catch (error) {
setState(prev => ({
...prev,
error: error instanceof Error ? error.message : 'Unknown error',
}))
} finally {
setState(prev => ({ ...prev, isStreaming: false }))
}
}, [])
return { ...state, sendMessage }
}import { streamText } from 'ai'
import { openai } from '@ai-sdk/openai'
export async function POST(request: Request) {
const { messages } = await request.json()
const result = streamText({
model: openai('gpt-4o'),
messages,
})
return result.toDataStreamResponse()
}'use client'
import { useChat } from 'ai/react'
export default function ChatPage() {
const { messages, input, handleInputChange, handleSubmit, isLoading } =
useChat({
api: '/api/chat',
})
return (
<div>
{messages.map(m => (
<div key={m.id}>
<strong>{m.role}:</strong> {m.content}
</div>
))}
<form onSubmit={handleSubmit}>
<input
value={input}
onChange={handleInputChange}
placeholder="메시지를 입력하세요..."
disabled={isLoading}
/>
</form>
</div>
)
}Vercel의 AI SDK는 프론트엔드 스트리밍 통합을 크게 단순화합니다. useChat 훅 하나로 메시지 관리, 스트리밍 처리, 로딩 상태까지 모두 처리됩니다. LangChain, LlamaIndex 등 다양한 백엔드와 호환됩니다.
import asyncio
async def buffered_stream(chain, input_data, buffer_size: int = 5):
"""토큰을 버퍼링하여 네트워크 효율성 개선"""
buffer = []
async for chunk in chain.astream(input_data):
if chunk.content:
buffer.append(chunk.content)
if len(buffer) >= buffer_size:
yield "".join(buffer)
buffer = []
# 남은 버퍼 전송
if buffer:
yield "".join(buffer)async def stream_with_timeout(chain, input_data, timeout: float = 30.0):
"""스트리밍에 타임아웃 적용"""
try:
async with asyncio.timeout(timeout):
async for chunk in chain.astream(input_data):
yield chunk
except asyncio.TimeoutError:
yield {"type": "error", "message": "응답 시간이 초과되었습니다."}10장에서는 프로덕션 운영의 핵심인 에러 처리, 폴백, 관측 가능성을 다룹니다. 재시도 전략, 서킷 브레이커, OpenTelemetry 통합, 비용 추적, 그리고 프로덕션 모니터링 대시보드 구성까지 살펴보겠습니다.
이 글이 도움이 되셨나요?
관련 주제 더 보기
재시도 전략, 서킷 브레이커, OpenTelemetry 통합, 비용 추적, 프로덕션 모니터링까지 프로덕션 안정성 패턴을 다룹니다.
대화 메모리, 장기 메모리, 벡터 메모리, 구조화된 상태를 각 프레임워크별로 비교하고 프로덕션 메모리 전략을 정리합니다.
5대 프레임워크 종합 비교, 의사결정 트리, 하이브리드 아키텍처, 마이그레이션 가이드, 프레임워크 독립적 설계 원칙을 다룹니다.