SSE, gRPC, WebSocket을 결합한 하이브리드 스트리밍 AI 시스템을 설계하고 구현합니다. 프로토콜 선택 의사결정 트리, 엔드투엔드 구현, 성능 최적화, 운영 체크리스트를 다룹니다.
이 장에서는 시리즈 전체에서 학습한 내용을 종합하여, 프로덕션 수준의 AI 어시스턴트 플랫폼을 설계하고 구현합니다.
| 기능 | 설명 |
|---|---|
| AI 채팅 | LLM 기반 텍스트 대화, 토큰 스트리밍 |
| 생성 중단 | 사용자가 AI 응답 중 중단 가능 |
| 도구 호출 | AI가 외부 API/DB를 호출하여 정보 수집 |
| 실시간 협업 | 여러 사용자가 같은 대화를 관찰/참여 |
| 파일 분석 | 업로드된 문서에 대한 AI 분석 |
| 사용량 추적 | 토큰 사용량, 비용 실시간 모니터링 |
요구사항을 분석하면, 단일 프로토콜로는 모든 요구를 충족할 수 없습니다. 각 기능에 최적의 프로토콜을 매핑합니다.
| 기능 | 프로토콜 | 근거 |
|---|---|---|
| 토큰 스트리밍 | SSE | 단방향, 무상태 스케일링 |
| 생성 중단 | SSE (AbortController) | 연결 종료로 충분 |
| 실시간 협업 | WebSocket | 양방향, 다수 참여자 |
| 서비스 간 통신 | gRPC | 저지연, 타입 안전 |
| 파일 업로드 | REST | 표준 멀티파트 업로드 |
| 사용량 조회 | REST | 단순 요청-응답 |
새로운 기능을 추가할 때, 어떤 프로토콜을 선택해야 할지 판단하는 의사결정 트리입니다.
import { NextRequest } from "next/server";
export async function POST(request: NextRequest) {
const { messages, sessionId } = await request.json();
const encoder = new TextEncoder();
const stream = new ReadableStream({
async start(controller) {
const tracker = createStreamTracker(sessionId);
try {
// 이벤트 기록: 스트리밍 시작
await recordEvent({
type: "AssistantStreamStarted",
sessionId,
timestamp: Date.now(),
});
// 시맨틱 캐시 확인
const cached = await semanticCache.get(
messages[messages.length - 1].content
);
if (cached) {
// 캐시 히트: 즉시 응답 (토큰 단위로 분할하여 자연스러운 스트리밍)
const tokens = splitIntoTokens(cached);
for (const token of tokens) {
controller.enqueue(
encoder.encode(
`data: ${JSON.stringify({ token })}\n\n`
)
);
await delay(20); // 자연스러운 타이핑 효과
}
} else {
// 캐시 미스: LLM 추론 (gRPC로 추론 서버 호출)
const inferenceStream =
await inferenceClient.streamPredict({
messages,
maxTokens: 2048,
temperature: 0.7,
});
const fullResponse: string[] = [];
for await (const token of inferenceStream) {
fullResponse.push(token.text);
tracker.onToken();
controller.enqueue(
encoder.encode(
`data: ${JSON.stringify({ token: token.text })}\n\n`
)
);
// 도구 호출 감지
if (token.toolCall) {
controller.enqueue(
encoder.encode(
`event: tool_call\ndata: ${JSON.stringify(token.toolCall)}\n\n`
)
);
// 도구 실행
const toolResult = await executeToolCall(
token.toolCall
);
controller.enqueue(
encoder.encode(
`event: tool_result\ndata: ${JSON.stringify(toolResult)}\n\n`
)
);
}
}
// 캐시 저장
await semanticCache.put(
messages[messages.length - 1].content,
fullResponse.join("")
);
}
// 완료 이벤트
controller.enqueue(
encoder.encode("data: [DONE]\n\n")
);
tracker.onComplete("success");
// 이벤트 기록: 스트리밍 완료
await recordEvent({
type: "AssistantStreamCompleted",
sessionId,
timestamp: Date.now(),
});
} catch (error) {
tracker.onComplete("error");
controller.enqueue(
encoder.encode(
`event: error\ndata: ${JSON.stringify({
message: "스트리밍 중 오류가 발생했습니다",
})}\n\n`
)
);
} finally {
controller.close();
}
},
});
return new Response(stream, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
},
});
}import { WebSocketServer, WebSocket } from "ws";
import { createClient } from "redis";
const pub = createClient();
const sub = createClient();
await pub.connect();
await sub.connect();
interface Session {
id: string;
participants: Map<string, WebSocket>;
isStreaming: boolean;
}
const sessions = new Map<string, Session>();
const wss = new WebSocketServer({ port: 8080 });
wss.on("connection", (ws, req) => {
const sessionId = extractSessionId(req);
const userId = extractUserId(req);
// 세션 참가
let session = sessions.get(sessionId);
if (!session) {
session = {
id: sessionId,
participants: new Map(),
isStreaming: false,
};
sessions.set(sessionId, session);
// Redis 구독: 다른 서버의 메시지 수신
sub.subscribe(`session:${sessionId}`, (message) => {
broadcastToSession(sessionId, message);
});
}
session.participants.set(userId, ws);
// 참여자 입장 알림
broadcastToSession(
sessionId,
JSON.stringify({
type: "participant-joined",
userId,
participantCount: session.participants.size,
})
);
ws.on("message", async (data) => {
const msg = JSON.parse(data.toString());
switch (msg.type) {
case "chat-message":
// 모든 참여자에게 브로드캐스트
await pub.publish(
`session:${sessionId}`,
JSON.stringify({
type: "user-message",
userId,
content: msg.content,
timestamp: Date.now(),
})
);
break;
case "typing-indicator":
await pub.publish(
`session:${sessionId}`,
JSON.stringify({
type: "typing",
userId,
isTyping: msg.isTyping,
})
);
break;
case "cursor-position":
// 협업 편집 시 커서 위치 공유
await pub.publish(
`session:${sessionId}`,
JSON.stringify({
type: "cursor",
userId,
position: msg.position,
})
);
break;
}
});
ws.on("close", () => {
session?.participants.delete(userId);
broadcastToSession(
sessionId,
JSON.stringify({
type: "participant-left",
userId,
participantCount: session?.participants.size ?? 0,
})
);
// 마지막 참여자가 떠나면 세션 정리
if (session?.participants.size === 0) {
sub.unsubscribe(`session:${sessionId}`);
sessions.delete(sessionId);
}
});
});
function broadcastToSession(sessionId: string, message: string) {
const session = sessions.get(sessionId);
if (!session) return;
for (const [, ws] of session.participants) {
if (ws.readyState === WebSocket.OPEN) {
ws.send(message);
}
}
}import grpc
from concurrent import futures
import asyncio
from dataclasses import dataclass
import inference_pb2
import inference_pb2_grpc
@dataclass
class BackendHealth:
address: str
queue_depth: int
gpu_utilization: float
avg_latency_ms: float
is_healthy: bool
class InferenceRouter(inference_pb2_grpc.InferenceServiceServicer):
def __init__(self, backends: list[str]):
self.backends = {}
for addr in backends:
channel = grpc.aio.insecure_channel(addr)
stub = inference_pb2_grpc.InferenceServiceStub(channel)
self.backends[addr] = {
"stub": stub,
"health": BackendHealth(
address=addr,
queue_depth=0,
gpu_utilization=0.0,
avg_latency_ms=0.0,
is_healthy=True,
),
}
def _select_backend(self) -> inference_pb2_grpc.InferenceServiceStub:
"""최소 큐 깊이 기반 백엔드 선택"""
healthy = [
b for b in self.backends.values()
if b["health"].is_healthy
]
if not healthy:
raise grpc.RpcError("No healthy backends available")
best = min(healthy, key=lambda b: b["health"].queue_depth)
return best["stub"]
async def StreamPredict(self, request, context):
"""요청을 최적의 백엔드로 라우팅하고 응답을 중계"""
backend = self._select_backend()
try:
async for token in backend.StreamPredict(request):
# 클라이언트가 취소했는지 확인
if context.cancelled():
return
yield token
except grpc.RpcError as e:
# 백엔드 장애 시 다른 백엔드로 폴백
if e.code() == grpc.StatusCode.UNAVAILABLE:
self._mark_unhealthy(backend)
fallback = self._select_backend()
async for token in fallback.StreamPredict(request):
yield token
else:
raise
async def serve():
server = grpc.aio.server(
futures.ThreadPoolExecutor(max_workers=20),
options=[
("grpc.keepalive_time_ms", 30000),
("grpc.keepalive_timeout_ms", 10000),
("grpc.max_send_message_length", 100 * 1024 * 1024),
],
)
router = InferenceRouter(
backends=[
"gpu-server-1:50051",
"gpu-server-2:50051",
]
)
inference_pb2_grpc.add_InferenceServiceServicer_to_server(
router, server
)
server.add_insecure_port("[::]:50050")
await server.start()
await server.wait_for_termination()import { useState, useCallback, useRef, useEffect } from "react";
interface Message {
id: string;
role: "user" | "assistant";
content: string;
isStreaming?: boolean;
toolCalls?: ToolCall[];
}
function AIAssistant({ sessionId }: { sessionId: string }) {
const [messages, setMessages] = useState<Message[]>([]);
const [input, setInput] = useState("");
const [isStreaming, setIsStreaming] = useState(false);
const abortRef = useRef<AbortController | null>(null);
const wsRef = useRef<WebSocket | null>(null);
const [participants, setParticipants] = useState<string[]>([]);
// WebSocket 연결 (협업)
useEffect(() => {
const ws = new WebSocket(
`wss://api.example.com/ws?session=${sessionId}`
);
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
switch (data.type) {
case "participant-joined":
setParticipants((prev) => [...prev, data.userId]);
break;
case "participant-left":
setParticipants((prev) =>
prev.filter((p) => p !== data.userId)
);
break;
case "user-message":
// 다른 참여자의 메시지
if (data.userId !== getCurrentUserId()) {
setMessages((prev) => [
...prev,
{
id: crypto.randomUUID(),
role: "user",
content: data.content,
},
]);
}
break;
}
};
wsRef.current = ws;
return () => ws.close();
}, [sessionId]);
// SSE 스트리밍 채팅
const sendMessage = useCallback(async () => {
if (!input.trim() || isStreaming) return;
const userMessage: Message = {
id: crypto.randomUUID(),
role: "user",
content: input,
};
const assistantMessage: Message = {
id: crypto.randomUUID(),
role: "assistant",
content: "",
isStreaming: true,
};
setMessages((prev) => [...prev, userMessage, assistantMessage]);
setInput("");
setIsStreaming(true);
// WebSocket으로 다른 참여자에게 알림
wsRef.current?.send(
JSON.stringify({ type: "chat-message", content: input })
);
const controller = new AbortController();
abortRef.current = controller;
try {
const response = await fetch("/api/chat/stream", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
messages: [...messages, userMessage].map((m) => ({
role: m.role,
content: m.content,
})),
sessionId,
}),
signal: controller.signal,
});
const reader = response.body?.getReader();
const decoder = new TextDecoder();
if (!reader) return;
let buffer = "";
let fullContent = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const events = buffer.split("\n\n");
buffer = events.pop() ?? "";
for (const event of events) {
const lines = event.split("\n");
const eventType =
lines
.find((l) => l.startsWith("event: "))
?.slice(7) ?? "message";
const dataLine = lines.find((l) =>
l.startsWith("data: ")
);
if (!dataLine) continue;
const data = dataLine.slice(6);
if (data === "[DONE]") continue;
if (eventType === "message") {
const parsed = JSON.parse(data);
fullContent += parsed.token;
setMessages((prev) =>
prev.map((m) =>
m.id === assistantMessage.id
? { ...m, content: fullContent }
: m
)
);
} else if (eventType === "tool_call") {
// 도구 호출 UI 표시
const toolCall = JSON.parse(data);
setMessages((prev) =>
prev.map((m) =>
m.id === assistantMessage.id
? {
...m,
toolCalls: [
...(m.toolCalls ?? []),
toolCall,
],
}
: m
)
);
}
}
}
} catch (error) {
if (
error instanceof DOMException &&
error.name === "AbortError"
) {
// 사용자 중단
} else {
console.error("스트리밍 오류:", error);
}
} finally {
setIsStreaming(false);
setMessages((prev) =>
prev.map((m) =>
m.id === assistantMessage.id
? { ...m, isStreaming: false }
: m
)
);
}
}, [input, isStreaming, messages, sessionId]);
const handleStop = () => {
abortRef.current?.abort();
};
return (
<div className="ai-assistant">
{/* 참여자 표시 */}
<div className="participants">
{participants.length > 0 && (
<span>{participants.length}명 참여 중</span>
)}
</div>
{/* 메시지 목록 */}
<div className="messages">
{messages.map((msg) => (
<div key={msg.id} className={`message ${msg.role}`}>
<div className="content">
{msg.content}
{msg.isStreaming && (
<span className="cursor-blink">|</span>
)}
</div>
{msg.toolCalls?.map((tc, i) => (
<div key={i} className="tool-call">
{tc.name}: {tc.status}
</div>
))}
</div>
))}
</div>
{/* 입력 영역 */}
<div className="input-area">
<input
value={input}
onChange={(e) => setInput(e.target.value)}
onKeyDown={(e) => e.key === "Enter" && sendMessage()}
placeholder="메시지를 입력하세요"
disabled={isStreaming}
/>
{isStreaming ? (
<button onClick={handleStop}>중단</button>
) : (
<button onClick={sendMessage}>전송</button>
)}
</div>
</div>
);
}| 영역 | 최적화 포인트 | 기대 효과 |
|---|---|---|
| TTFT | 시맨틱 캐싱 | 캐시 히트 시 10-50ms로 단축 |
| TTFT | vLLM 스트리밍 입력 | 프리필 시간 30-50% 감소 |
| 처리량 | Continuous Batching | GPU 사용률 85-95% |
| 네트워크 | HTTP/3 (QUIC) 지원 | 패킷 손실 시 지연 감소 |
| 렌더링 | requestAnimationFrame 배치 | 클라이언트 프레임 드롭 방지 |
| 비용 | 레이트 리미팅 + 캐싱 | API 비용 30-50% 절감 |
| 안정성 | 다계층 백프레셔 | 과부하 시 시스템 보호 |
| 가용성 | 서킷 브레이커 + 폴백 | 부분 장애 시에도 서비스 유지 |
배포 전 반드시 확인해야 할 항목들입니다.
proxy_buffering off 설정 확인이 체크리스트를 팀의 배포 파이프라인에 통합하세요. 모든 항목이 자동화된 테스트로 검증되는 것이 이상적이지만, 최소한 배포 전 수동 확인이라도 반드시 수행해야 합니다.
10장에 걸쳐 스트리밍 아키텍처와 실시간 AI 시스템의 전체 스펙트럼을 다루었습니다.
| 장 | 핵심 학습 |
|---|---|
| 1장 | 스트리밍의 필요성, TTFT/TPOT, 프로토콜 생태계 |
| 2장 | SSE — LLM 스트리밍의 사실상 표준, 무상태의 힘 |
| 3장 | WebSocket — 양방향 통신, 스케일링의 복잡성 |
| 4장 | gRPC — 백엔드 고성능 통신, Protobuf |
| 5장 | LLM API 차이 통합, 파셜 JSON 파싱, Vercel AI SDK |
| 6장 | 추론 파이프라인, vLLM, 시맨틱 캐싱, 멀티모달 |
| 7장 | 이벤트 소싱, CQRS, 시간 여행 디버깅 |
| 8장 | 백프레셔, 토큰 버킷, 다계층 흐름 제어 |
| 9장 | 프로덕션 인프라, Kubernetes, HTTP/3 |
| 10장 | 하이브리드 아키텍처, 엔드투엔드 구현 |
이번 장에서는 시리즈 전체의 학습 내용을 종합하여 프로덕션 수준의 스트리밍 AI 시스템을 설계하고 구현했습니다.
스트리밍은 단순한 기술적 선택이 아니라, AI 시대의 사용자 경험을 좌우하는 아키텍처 결정입니다. 이 시리즈가 여러분의 실시간 AI 시스템 구축에 실질적인 도움이 되었기를 바랍니다.
이 글이 도움이 되셨나요?
관련 주제 더 보기
로드밸런서의 WebSocket 업그레이드, CDN과 스트리밍, Kubernetes에서의 스트리밍 서비스 운영, 모니터링 전략, HTTP/3(QUIC)과 WebTransport의 미래를 다룹니다.
생산자-소비자 속도 불일치를 관리하는 백프레셔의 원리, 버퍼링/드롭/속도 제한 전략, LLM API 레이트 리미팅, 토큰 버킷 알고리즘, 큐 깊이 모니터링을 다룹니다.
이벤트 소싱과 CQRS 패턴의 원리를 살펴보고, AI 시스템에서의 적용 사례를 다룹니다. 대화 이력 관리, 에이전트 상태 추적, 시간 여행 디버깅, Kafka와 EventStoreDB 활용을 포함합니다.