SSE 기반 토큰 스트리밍 프로토콜, OpenAI 호환 스트리밍 형식, 에러 처리, 클라이언트 취소, 프론트엔드 통합 패턴을 학습합니다.
LLM의 응답 생성에는 수 초가 소요됩니다. 전체 응답이 완성될 때까지 사용자를 기다리게 하면 체감 지연이 극심합니다. 토큰이 생성되는 즉시 전달하는 스트리밍은 첫 토큰까지의 시간(TTFT, Time To First Token)을 수십 밀리초로 줄여 사용자 경험을 극적으로 개선합니다.
| 지표 | 동기 응답 | 스트리밍 응답 |
|---|---|---|
| 첫 글자 표시까지 | 3-10초 | 100-500ms |
| 체감 대기 시간 | 매우 김 | 거의 없음 |
| 취소 가능성 | 완료 전까지 불가 | 언제든 가능 |
| 중간 결과 활용 | 불가 | 가능 |
SSE(Server-Sent Events)는 HTTP 기반의 단방향 서버-클라이언트 스트리밍 프로토콜입니다. WebSocket과 달리 일반 HTTP를 사용하므로 기존 인프라(프록시, CDN, 로드밸런서)와 호환성이 우수합니다.
event: message
data: {"key": "value"}
id: msg-001
retry: 3000
event: message
data: {"key": "value2"}
id: msg-002
event: done
data: [DONE]SSE 메시지는 네 가지 필드로 구성됩니다.
data — 메시지 본문 (필수). 여러 줄은 여러 data: 라인으로 표현합니다event — 이벤트 타입 (선택). 클라이언트에서 타입별로 다른 핸들러를 등록할 수 있습니다id — 메시지 ID (선택). 재연결 시 Last-Event-ID로 전송됩니다retry — 재연결 대기 시간 밀리초 (선택)from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
import json
import asyncio
@app.post("/api/v1/chat/completions")
async def create_chat_completion(
request: CompletionRequest,
http_request: Request,
):
if not request.stream:
return await sync_completion(request)
return StreamingResponse(
stream_completion(request, http_request),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # Nginx 버퍼링 비활성화
},
)
async def stream_completion(
request: CompletionRequest,
http_request: Request,
):
request_id = f"chatcmpl-{uuid.uuid4().hex[:12]}"
total_prompt_tokens = count_tokens(request.messages)
total_completion_tokens = 0
try:
async for token in inference_engine.stream(request):
# 클라이언트 연결 확인
if await http_request.is_disconnected():
break
total_completion_tokens += 1
chunk = {
"id": request_id,
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": request.model,
"choices": [{
"index": 0,
"delta": {"content": token},
"finish_reason": None,
}],
}
yield f"data: {json.dumps(chunk, ensure_ascii=False)}\n\n"
# 완료 청크
final_chunk = {
"id": request_id,
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": request.model,
"choices": [{
"index": 0,
"delta": {},
"finish_reason": "stop",
}],
"usage": {
"prompt_tokens": total_prompt_tokens,
"completion_tokens": total_completion_tokens,
"total_tokens": total_prompt_tokens + total_completion_tokens,
},
}
yield f"data: {json.dumps(final_chunk, ensure_ascii=False)}\n\n"
# 종료 신호
yield "data: [DONE]\n\n"
except Exception as e:
error_chunk = {
"error": {
"type": "server_error",
"message": str(e),
"code": "inference_error",
}
}
yield f"data: {json.dumps(error_chunk)}\n\n"
yield "data: [DONE]\n\n"OpenAI가 정의한 스트리밍 형식은 사실상 업계 표준이 되었습니다. 이 형식을 따르면 기존 OpenAI SDK와 도구 생태계를 그대로 활용할 수 있습니다.
// 일반 텍스트 스트리밍 청크
interface StreamChunk {
id: string;
object: "chat.completion.chunk";
created: number;
model: string;
choices: StreamChoice[];
usage?: TokenUsage; // 마지막 청크에서만 포함
}
interface StreamChoice {
index: number;
delta: MessageDelta;
finish_reason: string | null;
}
interface MessageDelta {
role?: string; // 첫 번째 청크에서만
content?: string; // 토큰 내용
tool_calls?: ToolCallDelta[];
}
// 도구 호출 스트리밍
interface ToolCallDelta {
index: number;
id?: string; // 첫 번째 청크에서만
type?: string; // 첫 번째 청크에서만
function?: {
name?: string; // 첫 번째 청크에서만
arguments?: string; // 점진적으로 누적
};
}data: {"id":"chatcmpl-abc","object":"chat.completion.chunk","created":1711234567,"model":"claude-4","choices":[{"index":0,"delta":{"role":"assistant","content":""},"finish_reason":null}]}
data: {"id":"chatcmpl-abc","object":"chat.completion.chunk","created":1711234567,"model":"claude-4","choices":[{"index":0,"delta":{"content":"API"},"finish_reason":null}]}
data: {"id":"chatcmpl-abc","object":"chat.completion.chunk","created":1711234567,"model":"claude-4","choices":[{"index":0,"delta":{"content":" 설계"},"finish_reason":null}]}
data: {"id":"chatcmpl-abc","object":"chat.completion.chunk","created":1711234567,"model":"claude-4","choices":[{"index":0,"delta":{"content":"에서"},"finish_reason":null}]}
data: {"id":"chatcmpl-abc","object":"chat.completion.chunk","created":1711234567,"model":"claude-4","choices":[{"index":0,"delta":{},"finish_reason":"stop"}],"usage":{"prompt_tokens":25,"completion_tokens":8,"total_tokens":33}}
data: [DONE]첫 번째 청크에서 role: "assistant"를 보내고, 이후 청크에서는 content만 점진적으로 전송합니다. 마지막 청크에서 finish_reason과 usage 정보를 포함하여 스트리밍의 완료와 비용을 알립니다.
도구 호출 시에도 스트리밍이 적용됩니다. 함수명과 인자가 점진적으로 생성됩니다.
data: {"id":"chatcmpl-def","choices":[{"index":0,"delta":{"role":"assistant","content":null,"tool_calls":[{"index":0,"id":"call_xyz","type":"function","function":{"name":"get_weather","arguments":""}}]},"finish_reason":null}]}
data: {"id":"chatcmpl-def","choices":[{"index":0,"delta":{"tool_calls":[{"index":0,"function":{"arguments":"{\"ci"}}]},"finish_reason":null}]}
data: {"id":"chatcmpl-def","choices":[{"index":0,"delta":{"tool_calls":[{"index":0,"function":{"arguments":"ty\":"}}]},"finish_reason":null}]}
data: {"id":"chatcmpl-def","choices":[{"index":0,"delta":{"tool_calls":[{"index":0,"function":{"arguments":" \"서울\"}"}}]},"finish_reason":null}]}
data: {"id":"chatcmpl-def","choices":[{"index":0,"delta":{},"finish_reason":"tool_calls"}]}
data: [DONE]클라이언트는 arguments 문자열을 누적하여 전체 JSON을 조합해야 합니다.
class ToolCallAccumulator {
private toolCalls: Map<number, {
id: string;
type: string;
function: { name: string; arguments: string };
}> = new Map();
processDelta(delta: MessageDelta): void {
if (!delta.tool_calls) return;
for (const tc of delta.tool_calls) {
if (!this.toolCalls.has(tc.index)) {
this.toolCalls.set(tc.index, {
id: tc.id ?? "",
type: tc.type ?? "function",
function: {
name: tc.function?.name ?? "",
arguments: "",
},
});
}
const existing = this.toolCalls.get(tc.index)!;
if (tc.function?.arguments) {
existing.function.arguments += tc.function.arguments;
}
}
}
getToolCalls(): ToolCall[] {
return Array.from(this.toolCalls.values()).map((tc) => ({
...tc,
function: {
...tc.function,
arguments: JSON.parse(tc.function.arguments),
},
}));
}
}스트리밍 중 사용자가 응답을 취소하거나, 네트워크 오류가 발생하는 상황을 처리해야 합니다.
async def stream_completion(
request: CompletionRequest,
http_request: Request,
):
"""클라이언트 연결 해제 시 추론을 즉시 중단합니다."""
inference_task = None
try:
async for token in inference_engine.stream(request):
# 주기적으로 클라이언트 연결 확인
if await http_request.is_disconnected():
logger.info(
f"클라이언트 연결 해제: {request.model}, "
f"생성된 토큰: {token_count}"
)
# 추론 리소스 즉시 해제
await inference_engine.cancel(request_id)
return
yield format_chunk(token)
except asyncio.CancelledError:
logger.info("스트리밍 취소됨")
await inference_engine.cancel(request_id)
except Exception as e:
yield format_error_chunk(e)
finally:
# 사용량 기록 (부분 응답도 과금)
await usage_tracker.record(
request_id=request_id,
prompt_tokens=prompt_tokens,
completion_tokens=token_count,
)class StreamingClient {
private controller: AbortController | null = null;
async streamCompletion(
request: CompletionRequest,
onToken: (token: string) => void,
onComplete: (usage: TokenUsage) => void,
onError: (error: Error) => void,
): Promise<void> {
this.controller = new AbortController();
const signal = this.controller.signal;
// 타임아웃 설정 (60초)
const timeoutId = setTimeout(() => this.cancel(), 60000);
try {
const response = await fetch("/api/v1/chat/completions", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ ...request, stream: true }),
signal,
});
if (!response.ok) {
throw new Error(`HTTP ${response.status}`);
}
const reader = response.body!.getReader();
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop() ?? "";
for (const line of lines) {
if (!line.startsWith("data: ")) continue;
const data = line.slice(6);
if (data === "[DONE]") {
return;
}
const chunk = JSON.parse(data);
const delta = chunk.choices[0]?.delta;
if (delta?.content) {
onToken(delta.content);
}
if (chunk.usage) {
onComplete(chunk.usage);
}
}
}
} catch (error) {
if (signal.aborted) return; // 의도적 취소
onError(error as Error);
} finally {
clearTimeout(timeoutId);
this.controller = null;
}
}
cancel(): void {
this.controller?.abort();
}
}클라이언트가 스트리밍을 취소하더라도, 이미 생성된 토큰은 과금됩니다. 서버는 취소 시점까지의 사용량을 반드시 기록해야 합니다. 취소가 빈번한 서비스라면, 클라이언트에게 취소 시점의 사용량을 알려주는 것이 투명한 비용 관리에 도움이 됩니다.
SSE 프로토콜은 연결이 끊어지면 브라우저가 자동으로 재연결을 시도합니다. Last-Event-ID 헤더를 통해 마지막으로 수신한 이벤트 이후부터 재개할 수 있습니다.
@app.post("/api/v1/chat/completions")
async def create_streaming_completion(
request: CompletionRequest,
http_request: Request,
):
last_event_id = http_request.headers.get("Last-Event-ID")
if last_event_id:
# 이전 스트리밍 세션 이어서 전송
return StreamingResponse(
resume_stream(request, last_event_id),
media_type="text/event-stream",
)
return StreamingResponse(
stream_completion(request, http_request),
media_type="text/event-stream",
)
async def stream_completion(request, http_request):
event_id = 0
async for token in inference_engine.stream(request):
event_id += 1
# 각 이벤트에 ID를 부여하여 재개 가능하게
chunk = format_chunk(token, request_id)
yield f"id: {event_id}\ndata: {json.dumps(chunk)}\n\n"
# 스트리밍 상태를 캐시에 저장
await cache.set(
f"stream:{request_id}:{event_id}",
chunk,
ttl=300, # 5분간 보관
)import { useState, useCallback, useRef } from "react";
interface UseStreamingOptions {
onToken?: (token: string) => void;
onComplete?: (usage: TokenUsage) => void;
onError?: (error: Error) => void;
timeout?: number;
}
function useStreaming(options: UseStreamingOptions = {}) {
const [content, setContent] = useState("");
const [isStreaming, setIsStreaming] = useState(false);
const [usage, setUsage] = useState<TokenUsage | null>(null);
const [error, setError] = useState<Error | null>(null);
const controllerRef = useRef<AbortController | null>(null);
const startStream = useCallback(
async (request: CompletionRequest) => {
setContent("");
setIsStreaming(true);
setUsage(null);
setError(null);
controllerRef.current = new AbortController();
try {
const response = await fetch("/api/v1/chat/completions", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ ...request, stream: true }),
signal: controllerRef.current.signal,
});
const reader = response.body!.getReader();
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop() ?? "";
for (const line of lines) {
if (!line.startsWith("data: ")) continue;
const data = line.slice(6);
if (data === "[DONE]") break;
const chunk = JSON.parse(data);
const token = chunk.choices[0]?.delta?.content;
if (token) {
setContent((prev) => prev + token);
options.onToken?.(token);
}
if (chunk.usage) {
setUsage(chunk.usage);
options.onComplete?.(chunk.usage);
}
}
}
} catch (err) {
if (controllerRef.current?.signal.aborted) return;
const error = err as Error;
setError(error);
options.onError?.(error);
} finally {
setIsStreaming(false);
controllerRef.current = null;
}
},
[options]
);
const cancel = useCallback(() => {
controllerRef.current?.abort();
setIsStreaming(false);
}, []);
return { content, isStreaming, usage, error, startStream, cancel };
}function ChatComponent() {
const { content, isStreaming, usage, startStream, cancel } =
useStreaming({
onComplete: (usage) => {
console.log(`토큰 사용: ${usage.total_tokens}`);
},
});
const handleSubmit = async (message: string) => {
await startStream({
model: "claude-4",
messages: [{ role: "user", content: message }],
});
};
return (
<div>
<div className="prose">{content}</div>
{isStreaming && (
<button onClick={cancel}>응답 중지</button>
)}
{usage && (
<span className="text-sm text-muted">
{usage.total_tokens} 토큰
</span>
)}
</div>
);
}이 장에서는 AI 서비스의 핵심 사용자 경험 요소인 스트리밍 응답 인터페이스를 설계했습니다. SSE 프로토콜의 기초부터 OpenAI 호환 스트리밍 형식, 도구 호출 스트리밍, 클라이언트 취소, 에러 복구, 그리고 React 프론트엔드 통합 패턴까지 다루었습니다.
스트리밍은 단순히 기술적 구현이 아니라, 사용자가 AI와 상호작용하는 근본적인 방식을 결정합니다. TTFT(Time To First Token)를 최소화하고, 취소 메커니즘을 제공하며, 부분 응답에 대한 비용을 투명하게 보고하는 것이 핵심입니다.
7장에서는 API 버전 관리와 하위 호환성을 다룹니다. URL 경로, 헤더, 쿼리 파라미터 방식의 버전 관리 전략, AI 서비스에서의 모델 버전과 API 버전 분리, 프롬프트 버전 관리, 그리고 안전한 폐기(Deprecation) 정책을 살펴봅니다.
이 글이 도움이 되셨나요?
관련 주제 더 보기
URL 경로, 헤더, 쿼리 파라미터 버전 관리 전략과 AI 서비스에서의 모델 버전 분리, 프롬프트 버전 관리, 폐기 정책을 학습합니다.
비동기 작업 패턴, 멀티모달 입력 처리, Function Calling 인터페이스, 배치 API, 구조화된 출력 등 AI 서비스 고유의 API 설계 패턴을 학습합니다.
토큰 기반 레이트 리미팅, 토큰 버킷과 슬라이딩 윈도우 알고리즘, 사용자별 한도 설정, 비용 캡, Redis 기반 구현을 학습합니다.