본문으로 건너뛰기
Kreath Archive
TechProjectsBooksAbout
TechProjectsBooksAbout

내비게이션

  • Tech
  • Projects
  • Books
  • About
  • Tags

카테고리

  • AI / ML
  • 웹 개발
  • 프로그래밍
  • 개발 도구

연결

  • GitHub
  • Email
  • RSS
© 2026 Kreath Archive. All rights reserved.Built with Next.js + MDX
홈TechProjectsBooksAbout
//
  1. 홈
  2. 테크
  3. 9장: 스트리밍과 실시간 처리 패턴
2026년 2월 18일·AI / ML·

9장: 스트리밍과 실시간 처리 패턴

SSE/WebSocket, 토큰/이벤트 스트리밍, 구조화된 출력 스트리밍을 각 프레임워크별로 비교하고 프론트엔드 통합을 다룹니다.

13분909자10개 섹션
orchestrationai-frameworkaillm
공유
ai-orchestration9 / 11
1234567891011
이전8장: 메모리 관리와 상태 유지다음10장: 에러 처리, 폴백, 관측 가능성

이 장에서 배우는 것

  • 스트리밍이 사용자 경험에 미치는 영향
  • SSE(Server-Sent Events)와 WebSocket의 차이와 선택 기준
  • 토큰 스트리밍과 이벤트 스트리밍의 구현
  • 구조화된 출력의 스트리밍 처리
  • 각 프레임워크의 스트리밍 구현 비교
  • 프론트엔드 통합 패턴

사용자는 기다리지 않는다

GPT-4o 수준의 모델이 복잡한 질문에 답변하는 데 걸리는 시간은 5~15초입니다. 아무런 피드백 없이 15초를 기다리게 하면 대부분의 사용자는 이탈합니다. 스트리밍은 이 문제를 해결하는 핵심 기술입니다.

스트리밍의 효과는 심리적입니다. 실제 완료 시간이 동일하더라도, 토큰이 하나씩 나타나는 것을 보는 사용자는 응답이 더 빠르다고 느낍니다. ChatGPT가 처음부터 스트리밍을 채택한 것도 이 때문입니다.


전송 프로토콜 선택

SSE (Server-Sent Events)

서버에서 클라이언트로의 단방향 스트리밍에 적합합니다.

sse_server.py
python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from langchain_openai import ChatOpenAI
 
app = FastAPI()
 
@app.get("/chat/stream")
async def stream_chat(question: str):
    model = ChatOpenAI(model="gpt-4o", streaming=True)
 
    async def event_generator():
        async for chunk in model.astream(question):
            if chunk.content:
                yield f"data: {chunk.content}\n\n"
        yield "data: [DONE]\n\n"
 
    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
        },
    )

WebSocket

양방향 통신이 필요한 경우에 사용합니다.

websocket_server.py
python
from fastapi import FastAPI, WebSocket
 
app = FastAPI()
 
@app.websocket("/ws/chat")
async def websocket_chat(websocket: WebSocket):
    await websocket.accept()
 
    while True:
        data = await websocket.receive_json()
        question = data["question"]
 
        model = ChatOpenAI(model="gpt-4o", streaming=True)
 
        async for chunk in model.astream(question):
            if chunk.content:
                await websocket.send_json({
                    "type": "token",
                    "content": chunk.content,
                })
 
        await websocket.send_json({"type": "done"})

선택 기준

기준SSEWebSocket
통신 방향단방향 (서버 -> 클라이언트)양방향
구현 복잡도낮음높음
재연결자동 지원수동 구현 필요
HTTP 호환성HTTP/2 호환별도 프로토콜
적합한 경우단순 토큰 스트리밍실시간 대화, 동시 편집
Tip

대부분의 LLM 챗봇 인터페이스에는 SSE가 충분합니다. WebSocket은 사용자가 스트리밍 중에 추가 입력을 보내거나, 서버가 이벤트를 능동적으로 푸시해야 하는 경우에 사용하세요.


토큰 스트리밍

LangChain

token_streaming_langchain.py
python
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
 
model = ChatOpenAI(model="gpt-4o", streaming=True)
prompt = ChatPromptTemplate.from_template("{question}")
chain = prompt | model
 
# 동기 스트리밍
for chunk in chain.stream({"question": "Python의 장점은?"}):
    print(chunk.content, end="", flush=True)
 
# 비동기 스트리밍
async for chunk in chain.astream({"question": "Python의 장점은?"}):
    print(chunk.content, end="", flush=True)

LangGraph

token_streaming_langgraph.py
python
config = {"configurable": {"thread_id": "user-123"}}
 
# 노드별 스트리밍
async for event in app.astream(
    {"messages": [HumanMessage("분석해줘")]},
    config=config,
    stream_mode="values",
):
    # 각 노드 완료 시 전체 상태 수신
    messages = event.get("messages", [])
    if messages:
        print(f"Latest: {messages[-1].content}")
 
# 토큰 단위 스트리밍
async for event in app.astream(
    {"messages": [HumanMessage("분석해줘")]},
    config=config,
    stream_mode="messages",
):
    # 개별 토큰 수신
    message, metadata = event
    if message.content:
        print(message.content, end="", flush=True)

LlamaIndex

token_streaming_llamaindex.py
python
from llama_index.core import VectorStoreIndex
 
index = VectorStoreIndex.from_documents(documents)
query_engine = index.as_query_engine(streaming=True)
 
# 스트리밍 응답
streaming_response = query_engine.query("LlamaIndex의 특징은?")
 
for token in streaming_response.response_gen:
    print(token, end="", flush=True)

Haystack

token_streaming_haystack.py
python
from haystack.components.generators import OpenAIGenerator
 
generator = OpenAIGenerator(
    model="gpt-4o",
    streaming_callback=lambda chunk: print(chunk.content, end=""),
)
 
# 스트리밍 콜백 방식
result = generator.run(prompt="Haystack의 특징은?")

이벤트 스트리밍

토큰 스트리밍보다 풍부한 정보를 제공하는 이벤트 스트리밍입니다. 어떤 노드가 실행 중인지, 도구가 호출되었는지 등의 메타 정보를 포함합니다.

LangChain astream_events

event_streaming.py
python
async for event in chain.astream_events(
    {"question": "Python의 GIL에 대해 설명해줘"},
    version="v2",
):
    kind = event["event"]
    name = event["name"]
 
    if kind == "on_chat_model_start":
        print(f"\n--- 모델 호출 시작: {name} ---")
 
    elif kind == "on_chat_model_stream":
        content = event["data"]["chunk"].content
        if content:
            print(content, end="", flush=True)
 
    elif kind == "on_chat_model_end":
        usage = event["data"]["output"].usage_metadata
        print(f"\n--- 모델 호출 완료 ---")
        print(f"토큰 사용: 입력 {usage['input_tokens']}, "
              f"출력 {usage['output_tokens']}")
 
    elif kind == "on_tool_start":
        print(f"\n[도구 호출] {event['data']['input']}")
 
    elif kind == "on_tool_end":
        print(f"[도구 결과] {event['data']['output']}")

LangGraph 이벤트 스트리밍

langgraph_events.py
python
async for event in app.astream_events(
    {"messages": [HumanMessage("날씨를 확인하고 옷차림을 추천해줘")]},
    config=config,
    version="v2",
):
    if event["event"] == "on_chain_start":
        if event["metadata"].get("langgraph_node"):
            node = event["metadata"]["langgraph_node"]
            print(f"\n=== 노드 시작: {node} ===")
 
    elif event["event"] == "on_chat_model_stream":
        content = event["data"]["chunk"].content
        if content:
            print(content, end="", flush=True)

구조화된 출력 스트리밍

JSON이나 Pydantic 모델 같은 구조화된 출력을 스트리밍하는 것은 독특한 도전입니다. 부분적인 JSON은 파싱할 수 없기 때문입니다.

부분 파싱 접근

structured_streaming.py
python
from langchain_openai import ChatOpenAI
from pydantic import BaseModel
 
class AnalysisResult(BaseModel):
    summary: str
    key_points: list[str]
    sentiment: str
    confidence: float
 
model = ChatOpenAI(model="gpt-4o")
structured_model = model.with_structured_output(
    AnalysisResult,
    method="json_schema",
)
 
# 구조화된 출력 스트리밍
# 각 청크가 부분적으로 채워진 Pydantic 모델로 전달됨
async for partial in structured_model.astream(
    "이 기사를 분석해주세요: ..."
):
    if partial.summary:
        print(f"요약 (진행 중): {partial.summary}")
    if partial.key_points:
        print(f"핵심 포인트: {len(partial.key_points)}개 발견")
Warning

구조화된 출력 스트리밍에서 각 청크의 부분 객체는 불완전할 수 있습니다. 예를 들어, key_points 리스트의 마지막 항목이 중간에 잘려 있을 수 있습니다. UI에서는 이를 고려하여 "생성 중..." 표시를 적절히 활용해야 합니다.


프론트엔드 통합

React + SSE

useStreamChat.ts
typescript
import { useState, useCallback } from 'react'
 
interface StreamState {
  content: string
  isStreaming: boolean
  error: string | null
}
 
function useStreamChat() {
  const [state, setState] = useState<StreamState>({
    content: '',
    isStreaming: false,
    error: null,
  })
 
  const sendMessage = useCallback(async (question: string) => {
    setState({ content: '', isStreaming: true, error: null })
 
    try {
      const response = await fetch(
        `/api/chat/stream?question=${encodeURIComponent(question)}`
      )
 
      const reader = response.body?.getReader()
      const decoder = new TextDecoder()
 
      if (!reader) throw new Error('Stream not available')
 
      while (true) {
        const { done, value } = await reader.read()
        if (done) break
 
        const text = decoder.decode(value)
        const lines = text.split('\n')
 
        for (const line of lines) {
          if (line.startsWith('data: ')) {
            const data = line.slice(6)
            if (data === '[DONE]') break
 
            setState(prev => ({
              ...prev,
              content: prev.content + data,
            }))
          }
        }
      }
    } catch (error) {
      setState(prev => ({
        ...prev,
        error: error instanceof Error ? error.message : 'Unknown error',
      }))
    } finally {
      setState(prev => ({ ...prev, isStreaming: false }))
    }
  }, [])
 
  return { ...state, sendMessage }
}

Next.js API Route + AI SDK

app/api/chat/route.ts
typescript
import { streamText } from 'ai'
import { openai } from '@ai-sdk/openai'
 
export async function POST(request: Request) {
  const { messages } = await request.json()
 
  const result = streamText({
    model: openai('gpt-4o'),
    messages,
  })
 
  return result.toDataStreamResponse()
}
app/chat/page.tsx
typescript
'use client'
 
import { useChat } from 'ai/react'
 
export default function ChatPage() {
  const { messages, input, handleInputChange, handleSubmit, isLoading } =
    useChat({
      api: '/api/chat',
    })
 
  return (
    <div>
      {messages.map(m => (
        <div key={m.id}>
          <strong>{m.role}:</strong> {m.content}
        </div>
      ))}
      <form onSubmit={handleSubmit}>
        <input
          value={input}
          onChange={handleInputChange}
          placeholder="메시지를 입력하세요..."
          disabled={isLoading}
        />
      </form>
    </div>
  )
}
Info

Vercel의 AI SDK는 프론트엔드 스트리밍 통합을 크게 단순화합니다. useChat 훅 하나로 메시지 관리, 스트리밍 처리, 로딩 상태까지 모두 처리됩니다. LangChain, LlamaIndex 등 다양한 백엔드와 호환됩니다.


스트리밍 성능 최적화

청크 크기 최적화

chunk_optimization.py
python
import asyncio
 
async def buffered_stream(chain, input_data, buffer_size: int = 5):
    """토큰을 버퍼링하여 네트워크 효율성 개선"""
    buffer = []
 
    async for chunk in chain.astream(input_data):
        if chunk.content:
            buffer.append(chunk.content)
 
            if len(buffer) >= buffer_size:
                yield "".join(buffer)
                buffer = []
 
    # 남은 버퍼 전송
    if buffer:
        yield "".join(buffer)

타임아웃 처리

streaming_timeout.py
python
async def stream_with_timeout(chain, input_data, timeout: float = 30.0):
    """스트리밍에 타임아웃 적용"""
    try:
        async with asyncio.timeout(timeout):
            async for chunk in chain.astream(input_data):
                yield chunk
    except asyncio.TimeoutError:
        yield {"type": "error", "message": "응답 시간이 초과되었습니다."}

핵심 요약

  • 스트리밍은 LLM 애플리케이션의 체감 응답 속도를 극적으로 개선합니다.
  • SSE는 단방향 스트리밍에 적합하며, 대부분의 챗봇에 충분합니다. WebSocket은 양방향 통신이 필요할 때 선택합니다.
  • 토큰 스트리밍은 기본 수준의 실시간 피드백을, 이벤트 스트리밍은 노드 상태, 도구 호출 등 풍부한 메타 정보를 제공합니다.
  • 구조화된 출력 스트리밍은 부분 파싱이 필요하며, UI에서 불완전한 상태를 적절히 처리해야 합니다.
  • Vercel AI SDK를 사용하면 프론트엔드 통합을 크게 단순화할 수 있습니다.
  • 프로덕션에서는 청크 버퍼링, 타임아웃, 에러 처리를 반드시 구현해야 합니다.

다음 장 예고

10장에서는 프로덕션 운영의 핵심인 에러 처리, 폴백, 관측 가능성을 다룹니다. 재시도 전략, 서킷 브레이커, OpenTelemetry 통합, 비용 추적, 그리고 프로덕션 모니터링 대시보드 구성까지 살펴보겠습니다.

이 글이 도움이 되셨나요?

관련 주제 더 보기

#orchestration#ai-framework#ai#llm

관련 글

AI / ML

10장: 에러 처리, 폴백, 관측 가능성

재시도 전략, 서킷 브레이커, OpenTelemetry 통합, 비용 추적, 프로덕션 모니터링까지 프로덕션 안정성 패턴을 다룹니다.

2026년 2월 20일·15분
AI / ML

8장: 메모리 관리와 상태 유지

대화 메모리, 장기 메모리, 벡터 메모리, 구조화된 상태를 각 프레임워크별로 비교하고 프로덕션 메모리 전략을 정리합니다.

2026년 2월 16일·16분
AI / ML

11장: 프레임워크 선택 기준과 마이그레이션 전략

5대 프레임워크 종합 비교, 의사결정 트리, 하이브리드 아키텍처, 마이그레이션 가이드, 프레임워크 독립적 설계 원칙을 다룹니다.

2026년 2월 22일·20분
이전 글8장: 메모리 관리와 상태 유지
다음 글10장: 에러 처리, 폴백, 관측 가능성

댓글

목차

약 13분 남음
  • 이 장에서 배우는 것
  • 사용자는 기다리지 않는다
  • 전송 프로토콜 선택
    • SSE (Server-Sent Events)
    • WebSocket
    • 선택 기준
  • 토큰 스트리밍
    • LangChain
    • LangGraph
    • LlamaIndex
    • Haystack
  • 이벤트 스트리밍
    • LangChain astream_events
    • LangGraph 이벤트 스트리밍
  • 구조화된 출력 스트리밍
    • 부분 파싱 접근
  • 프론트엔드 통합
    • React + SSE
    • Next.js API Route + AI SDK
  • 스트리밍 성능 최적화
    • 청크 크기 최적화
    • 타임아웃 처리
  • 핵심 요약
  • 다음 장 예고