본문으로 건너뛰기
Kreath Archive
TechProjectsBooksAbout
TechProjectsBooksAbout

내비게이션

  • Tech
  • Projects
  • Books
  • About
  • Tags

카테고리

  • AI / ML
  • 웹 개발
  • 프로그래밍
  • 개발 도구

연결

  • GitHub
  • Email
  • RSS
© 2026 Kreath Archive. All rights reserved.Built with Next.js + MDX
홈TechProjectsBooksAbout
//
  1. 홈
  2. 테크
  3. 6장: 스트리밍 응답 인터페이스 설계
2026년 2월 14일·아키텍처·

6장: 스트리밍 응답 인터페이스 설계

SSE 기반 토큰 스트리밍 프로토콜, OpenAI 호환 스트리밍 형식, 에러 처리, 클라이언트 취소, 프론트엔드 통합 패턴을 학습합니다.

15분1,273자9개 섹션
api-designgraphqlarchitecture
공유
api-design6 / 11
1234567891011
이전5장: AI 서비스 API 설계 패턴다음7장: API 버전 관리와 하위 호환성

학습 목표

  • SSE 프로토콜의 동작 원리와 AI 스트리밍에의 적용을 이해합니다
  • OpenAI 호환 스트리밍 형식을 학습하고 구현합니다
  • 스트리밍 중 에러 처리와 클라이언트 취소 메커니즘을 익힙니다
  • 프론트엔드에서 스트리밍 응답을 소비하는 패턴을 실습합니다

왜 스트리밍인가

LLM의 응답 생성에는 수 초가 소요됩니다. 전체 응답이 완성될 때까지 사용자를 기다리게 하면 체감 지연이 극심합니다. 토큰이 생성되는 즉시 전달하는 스트리밍은 첫 토큰까지의 시간(TTFT, Time To First Token)을 수십 밀리초로 줄여 사용자 경험을 극적으로 개선합니다.

지표동기 응답스트리밍 응답
첫 글자 표시까지3-10초100-500ms
체감 대기 시간매우 김거의 없음
취소 가능성완료 전까지 불가언제든 가능
중간 결과 활용불가가능

SSE 프로토콜 기초

SSE(Server-Sent Events)는 HTTP 기반의 단방향 서버-클라이언트 스트리밍 프로토콜입니다. WebSocket과 달리 일반 HTTP를 사용하므로 기존 인프라(프록시, CDN, 로드밸런서)와 호환성이 우수합니다.

SSE 메시지 형식

sse-format.txt
text
event: message
data: {"key": "value"}
id: msg-001
retry: 3000
 
event: message
data: {"key": "value2"}
id: msg-002
 
event: done
data: [DONE]

SSE 메시지는 네 가지 필드로 구성됩니다.

  • data — 메시지 본문 (필수). 여러 줄은 여러 data: 라인으로 표현합니다
  • event — 이벤트 타입 (선택). 클라이언트에서 타입별로 다른 핸들러를 등록할 수 있습니다
  • id — 메시지 ID (선택). 재연결 시 Last-Event-ID로 전송됩니다
  • retry — 재연결 대기 시간 밀리초 (선택)

서버 구현

sse_server.py
python
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
import json
import asyncio
 
 
@app.post("/api/v1/chat/completions")
async def create_chat_completion(
    request: CompletionRequest,
    http_request: Request,
):
    if not request.stream:
        return await sync_completion(request)
    
    return StreamingResponse(
        stream_completion(request, http_request),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",  # Nginx 버퍼링 비활성화
        },
    )
 
 
async def stream_completion(
    request: CompletionRequest,
    http_request: Request,
):
    request_id = f"chatcmpl-{uuid.uuid4().hex[:12]}"
    total_prompt_tokens = count_tokens(request.messages)
    total_completion_tokens = 0
    
    try:
        async for token in inference_engine.stream(request):
            # 클라이언트 연결 확인
            if await http_request.is_disconnected():
                break
            
            total_completion_tokens += 1
            
            chunk = {
                "id": request_id,
                "object": "chat.completion.chunk",
                "created": int(time.time()),
                "model": request.model,
                "choices": [{
                    "index": 0,
                    "delta": {"content": token},
                    "finish_reason": None,
                }],
            }
            
            yield f"data: {json.dumps(chunk, ensure_ascii=False)}\n\n"
        
        # 완료 청크
        final_chunk = {
            "id": request_id,
            "object": "chat.completion.chunk",
            "created": int(time.time()),
            "model": request.model,
            "choices": [{
                "index": 0,
                "delta": {},
                "finish_reason": "stop",
            }],
            "usage": {
                "prompt_tokens": total_prompt_tokens,
                "completion_tokens": total_completion_tokens,
                "total_tokens": total_prompt_tokens + total_completion_tokens,
            },
        }
        yield f"data: {json.dumps(final_chunk, ensure_ascii=False)}\n\n"
        
        # 종료 신호
        yield "data: [DONE]\n\n"
        
    except Exception as e:
        error_chunk = {
            "error": {
                "type": "server_error",
                "message": str(e),
                "code": "inference_error",
            }
        }
        yield f"data: {json.dumps(error_chunk)}\n\n"
        yield "data: [DONE]\n\n"

OpenAI 호환 스트리밍 형식

OpenAI가 정의한 스트리밍 형식은 사실상 업계 표준이 되었습니다. 이 형식을 따르면 기존 OpenAI SDK와 도구 생태계를 그대로 활용할 수 있습니다.

스트리밍 청크 구조

streaming-chunk-types.ts
typescript
// 일반 텍스트 스트리밍 청크
interface StreamChunk {
  id: string;
  object: "chat.completion.chunk";
  created: number;
  model: string;
  choices: StreamChoice[];
  usage?: TokenUsage;  // 마지막 청크에서만 포함
}
 
interface StreamChoice {
  index: number;
  delta: MessageDelta;
  finish_reason: string | null;
}
 
interface MessageDelta {
  role?: string;     // 첫 번째 청크에서만
  content?: string;  // 토큰 내용
  tool_calls?: ToolCallDelta[];
}
 
// 도구 호출 스트리밍
interface ToolCallDelta {
  index: number;
  id?: string;       // 첫 번째 청크에서만
  type?: string;     // 첫 번째 청크에서만
  function?: {
    name?: string;   // 첫 번째 청크에서만
    arguments?: string;  // 점진적으로 누적
  };
}

스트리밍 시퀀스 예시

streaming-sequence.txt
text
data: {"id":"chatcmpl-abc","object":"chat.completion.chunk","created":1711234567,"model":"claude-4","choices":[{"index":0,"delta":{"role":"assistant","content":""},"finish_reason":null}]}
 
data: {"id":"chatcmpl-abc","object":"chat.completion.chunk","created":1711234567,"model":"claude-4","choices":[{"index":0,"delta":{"content":"API"},"finish_reason":null}]}
 
data: {"id":"chatcmpl-abc","object":"chat.completion.chunk","created":1711234567,"model":"claude-4","choices":[{"index":0,"delta":{"content":" 설계"},"finish_reason":null}]}
 
data: {"id":"chatcmpl-abc","object":"chat.completion.chunk","created":1711234567,"model":"claude-4","choices":[{"index":0,"delta":{"content":"에서"},"finish_reason":null}]}
 
data: {"id":"chatcmpl-abc","object":"chat.completion.chunk","created":1711234567,"model":"claude-4","choices":[{"index":0,"delta":{},"finish_reason":"stop"}],"usage":{"prompt_tokens":25,"completion_tokens":8,"total_tokens":33}}
 
data: [DONE]
Info

첫 번째 청크에서 role: "assistant"를 보내고, 이후 청크에서는 content만 점진적으로 전송합니다. 마지막 청크에서 finish_reason과 usage 정보를 포함하여 스트리밍의 완료와 비용을 알립니다.


도구 호출 스트리밍

도구 호출 시에도 스트리밍이 적용됩니다. 함수명과 인자가 점진적으로 생성됩니다.

tool-call-streaming.txt
text
data: {"id":"chatcmpl-def","choices":[{"index":0,"delta":{"role":"assistant","content":null,"tool_calls":[{"index":0,"id":"call_xyz","type":"function","function":{"name":"get_weather","arguments":""}}]},"finish_reason":null}]}
 
data: {"id":"chatcmpl-def","choices":[{"index":0,"delta":{"tool_calls":[{"index":0,"function":{"arguments":"{\"ci"}}]},"finish_reason":null}]}
 
data: {"id":"chatcmpl-def","choices":[{"index":0,"delta":{"tool_calls":[{"index":0,"function":{"arguments":"ty\":"}}]},"finish_reason":null}]}
 
data: {"id":"chatcmpl-def","choices":[{"index":0,"delta":{"tool_calls":[{"index":0,"function":{"arguments":" \"서울\"}"}}]},"finish_reason":null}]}
 
data: {"id":"chatcmpl-def","choices":[{"index":0,"delta":{},"finish_reason":"tool_calls"}]}
 
data: [DONE]

클라이언트는 arguments 문자열을 누적하여 전체 JSON을 조합해야 합니다.

tool-call-accumulator.ts
typescript
class ToolCallAccumulator {
  private toolCalls: Map<number, {
    id: string;
    type: string;
    function: { name: string; arguments: string };
  }> = new Map();
 
  processDelta(delta: MessageDelta): void {
    if (!delta.tool_calls) return;
 
    for (const tc of delta.tool_calls) {
      if (!this.toolCalls.has(tc.index)) {
        this.toolCalls.set(tc.index, {
          id: tc.id ?? "",
          type: tc.type ?? "function",
          function: {
            name: tc.function?.name ?? "",
            arguments: "",
          },
        });
      }
 
      const existing = this.toolCalls.get(tc.index)!;
      if (tc.function?.arguments) {
        existing.function.arguments += tc.function.arguments;
      }
    }
  }
 
  getToolCalls(): ToolCall[] {
    return Array.from(this.toolCalls.values()).map((tc) => ({
      ...tc,
      function: {
        ...tc.function,
        arguments: JSON.parse(tc.function.arguments),
      },
    }));
  }
}

클라이언트 취소와 타임아웃

스트리밍 중 사용자가 응답을 취소하거나, 네트워크 오류가 발생하는 상황을 처리해야 합니다.

서버 측 취소 감지

cancellation_server.py
python
async def stream_completion(
    request: CompletionRequest,
    http_request: Request,
):
    """클라이언트 연결 해제 시 추론을 즉시 중단합니다."""
    inference_task = None
    
    try:
        async for token in inference_engine.stream(request):
            # 주기적으로 클라이언트 연결 확인
            if await http_request.is_disconnected():
                logger.info(
                    f"클라이언트 연결 해제: {request.model}, "
                    f"생성된 토큰: {token_count}"
                )
                # 추론 리소스 즉시 해제
                await inference_engine.cancel(request_id)
                return
            
            yield format_chunk(token)
            
    except asyncio.CancelledError:
        logger.info("스트리밍 취소됨")
        await inference_engine.cancel(request_id)
    except Exception as e:
        yield format_error_chunk(e)
    finally:
        # 사용량 기록 (부분 응답도 과금)
        await usage_tracker.record(
            request_id=request_id,
            prompt_tokens=prompt_tokens,
            completion_tokens=token_count,
        )

클라이언트 측 취소

cancellation-client.ts
typescript
class StreamingClient {
  private controller: AbortController | null = null;
 
  async streamCompletion(
    request: CompletionRequest,
    onToken: (token: string) => void,
    onComplete: (usage: TokenUsage) => void,
    onError: (error: Error) => void,
  ): Promise<void> {
    this.controller = new AbortController();
    const signal = this.controller.signal;
 
    // 타임아웃 설정 (60초)
    const timeoutId = setTimeout(() => this.cancel(), 60000);
 
    try {
      const response = await fetch("/api/v1/chat/completions", {
        method: "POST",
        headers: { "Content-Type": "application/json" },
        body: JSON.stringify({ ...request, stream: true }),
        signal,
      });
 
      if (!response.ok) {
        throw new Error(`HTTP ${response.status}`);
      }
 
      const reader = response.body!.getReader();
      const decoder = new TextDecoder();
      let buffer = "";
 
      while (true) {
        const { done, value } = await reader.read();
        if (done) break;
 
        buffer += decoder.decode(value, { stream: true });
        const lines = buffer.split("\n");
        buffer = lines.pop() ?? "";
 
        for (const line of lines) {
          if (!line.startsWith("data: ")) continue;
          const data = line.slice(6);
 
          if (data === "[DONE]") {
            return;
          }
 
          const chunk = JSON.parse(data);
          const delta = chunk.choices[0]?.delta;
 
          if (delta?.content) {
            onToken(delta.content);
          }
 
          if (chunk.usage) {
            onComplete(chunk.usage);
          }
        }
      }
    } catch (error) {
      if (signal.aborted) return; // 의도적 취소
      onError(error as Error);
    } finally {
      clearTimeout(timeoutId);
      this.controller = null;
    }
  }
 
  cancel(): void {
    this.controller?.abort();
  }
}
Warning

클라이언트가 스트리밍을 취소하더라도, 이미 생성된 토큰은 과금됩니다. 서버는 취소 시점까지의 사용량을 반드시 기록해야 합니다. 취소가 빈번한 서비스라면, 클라이언트에게 취소 시점의 사용량을 알려주는 것이 투명한 비용 관리에 도움이 됩니다.


재시도와 에러 복구

SSE 자동 재연결

SSE 프로토콜은 연결이 끊어지면 브라우저가 자동으로 재연결을 시도합니다. Last-Event-ID 헤더를 통해 마지막으로 수신한 이벤트 이후부터 재개할 수 있습니다.

resumable_streaming.py
python
@app.post("/api/v1/chat/completions")
async def create_streaming_completion(
    request: CompletionRequest,
    http_request: Request,
):
    last_event_id = http_request.headers.get("Last-Event-ID")
    
    if last_event_id:
        # 이전 스트리밍 세션 이어서 전송
        return StreamingResponse(
            resume_stream(request, last_event_id),
            media_type="text/event-stream",
        )
    
    return StreamingResponse(
        stream_completion(request, http_request),
        media_type="text/event-stream",
    )
 
 
async def stream_completion(request, http_request):
    event_id = 0
    
    async for token in inference_engine.stream(request):
        event_id += 1
        # 각 이벤트에 ID를 부여하여 재개 가능하게
        chunk = format_chunk(token, request_id)
        yield f"id: {event_id}\ndata: {json.dumps(chunk)}\n\n"
        
        # 스트리밍 상태를 캐시에 저장
        await cache.set(
            f"stream:{request_id}:{event_id}",
            chunk,
            ttl=300,  # 5분간 보관
        )

프론트엔드 통합: React 패턴

useStreaming.ts
typescript
import { useState, useCallback, useRef } from "react";
 
interface UseStreamingOptions {
  onToken?: (token: string) => void;
  onComplete?: (usage: TokenUsage) => void;
  onError?: (error: Error) => void;
  timeout?: number;
}
 
function useStreaming(options: UseStreamingOptions = {}) {
  const [content, setContent] = useState("");
  const [isStreaming, setIsStreaming] = useState(false);
  const [usage, setUsage] = useState<TokenUsage | null>(null);
  const [error, setError] = useState<Error | null>(null);
  const controllerRef = useRef<AbortController | null>(null);
 
  const startStream = useCallback(
    async (request: CompletionRequest) => {
      setContent("");
      setIsStreaming(true);
      setUsage(null);
      setError(null);
 
      controllerRef.current = new AbortController();
 
      try {
        const response = await fetch("/api/v1/chat/completions", {
          method: "POST",
          headers: { "Content-Type": "application/json" },
          body: JSON.stringify({ ...request, stream: true }),
          signal: controllerRef.current.signal,
        });
 
        const reader = response.body!.getReader();
        const decoder = new TextDecoder();
        let buffer = "";
 
        while (true) {
          const { done, value } = await reader.read();
          if (done) break;
 
          buffer += decoder.decode(value, { stream: true });
          const lines = buffer.split("\n");
          buffer = lines.pop() ?? "";
 
          for (const line of lines) {
            if (!line.startsWith("data: ")) continue;
            const data = line.slice(6);
            if (data === "[DONE]") break;
 
            const chunk = JSON.parse(data);
            const token = chunk.choices[0]?.delta?.content;
 
            if (token) {
              setContent((prev) => prev + token);
              options.onToken?.(token);
            }
 
            if (chunk.usage) {
              setUsage(chunk.usage);
              options.onComplete?.(chunk.usage);
            }
          }
        }
      } catch (err) {
        if (controllerRef.current?.signal.aborted) return;
        const error = err as Error;
        setError(error);
        options.onError?.(error);
      } finally {
        setIsStreaming(false);
        controllerRef.current = null;
      }
    },
    [options]
  );
 
  const cancel = useCallback(() => {
    controllerRef.current?.abort();
    setIsStreaming(false);
  }, []);
 
  return { content, isStreaming, usage, error, startStream, cancel };
}

사용 예시

ChatComponent.tsx
tsx
function ChatComponent() {
  const { content, isStreaming, usage, startStream, cancel } =
    useStreaming({
      onComplete: (usage) => {
        console.log(`토큰 사용: ${usage.total_tokens}`);
      },
    });
 
  const handleSubmit = async (message: string) => {
    await startStream({
      model: "claude-4",
      messages: [{ role: "user", content: message }],
    });
  };
 
  return (
    <div>
      <div className="prose">{content}</div>
      {isStreaming && (
        <button onClick={cancel}>응답 중지</button>
      )}
      {usage && (
        <span className="text-sm text-muted">
          {usage.total_tokens} 토큰
        </span>
      )}
    </div>
  );
}

정리

이 장에서는 AI 서비스의 핵심 사용자 경험 요소인 스트리밍 응답 인터페이스를 설계했습니다. SSE 프로토콜의 기초부터 OpenAI 호환 스트리밍 형식, 도구 호출 스트리밍, 클라이언트 취소, 에러 복구, 그리고 React 프론트엔드 통합 패턴까지 다루었습니다.

스트리밍은 단순히 기술적 구현이 아니라, 사용자가 AI와 상호작용하는 근본적인 방식을 결정합니다. TTFT(Time To First Token)를 최소화하고, 취소 메커니즘을 제공하며, 부분 응답에 대한 비용을 투명하게 보고하는 것이 핵심입니다.

다음 장 미리보기

7장에서는 API 버전 관리와 하위 호환성을 다룹니다. URL 경로, 헤더, 쿼리 파라미터 방식의 버전 관리 전략, AI 서비스에서의 모델 버전과 API 버전 분리, 프롬프트 버전 관리, 그리고 안전한 폐기(Deprecation) 정책을 살펴봅니다.

이 글이 도움이 되셨나요?

관련 주제 더 보기

#api-design#graphql#architecture

관련 글

아키텍처

7장: API 버전 관리와 하위 호환성

URL 경로, 헤더, 쿼리 파라미터 버전 관리 전략과 AI 서비스에서의 모델 버전 분리, 프롬프트 버전 관리, 폐기 정책을 학습합니다.

2026년 2월 16일·16분
아키텍처

5장: AI 서비스 API 설계 패턴

비동기 작업 패턴, 멀티모달 입력 처리, Function Calling 인터페이스, 배치 API, 구조화된 출력 등 AI 서비스 고유의 API 설계 패턴을 학습합니다.

2026년 2월 12일·17분
아키텍처

8장: 레이트 리미팅과 비용 제어

토큰 기반 레이트 리미팅, 토큰 버킷과 슬라이딩 윈도우 알고리즘, 사용자별 한도 설정, 비용 캡, Redis 기반 구현을 학습합니다.

2026년 2월 18일·16분
이전 글5장: AI 서비스 API 설계 패턴
다음 글7장: API 버전 관리와 하위 호환성

댓글

목차

약 15분 남음
  • 학습 목표
  • 왜 스트리밍인가
  • SSE 프로토콜 기초
    • SSE 메시지 형식
    • 서버 구현
  • OpenAI 호환 스트리밍 형식
    • 스트리밍 청크 구조
    • 스트리밍 시퀀스 예시
  • 도구 호출 스트리밍
  • 클라이언트 취소와 타임아웃
    • 서버 측 취소 감지
    • 클라이언트 측 취소
  • 재시도와 에러 복구
    • SSE 자동 재연결
  • 프론트엔드 통합: React 패턴
    • 사용 예시
  • 정리
    • 다음 장 미리보기