본문으로 건너뛰기
Kreath Archive
TechProjectsBooksAbout
TechProjectsBooksAbout

내비게이션

  • Tech
  • Projects
  • Books
  • About
  • Tags

카테고리

  • AI / ML
  • 웹 개발
  • 프로그래밍
  • 개발 도구

연결

  • GitHub
  • Email
  • RSS
© 2026 Kreath Archive. All rights reserved.Built with Next.js + MDX
홈TechProjectsBooksAbout
//
  1. 홈
  2. 테크
  3. 10장: 실전 프로젝트 — 스트리밍 AI 시스템 구축
2026년 4월 5일·아키텍처·

10장: 실전 프로젝트 — 스트리밍 AI 시스템 구축

SSE, gRPC, WebSocket을 결합한 하이브리드 스트리밍 AI 시스템을 설계하고 구현합니다. 프로토콜 선택 의사결정 트리, 엔드투엔드 구현, 성능 최적화, 운영 체크리스트를 다룹니다.

21분1,467자9개 섹션
streamingai
공유
streaming-ai10 / 10
12345678910
이전9장: 프로덕션 스트리밍 인프라

학습 목표

  • SSE + gRPC + WebSocket을 결합한 하이브리드 아키텍처를 설계합니다
  • 프로토콜 선택을 위한 의사결정 트리를 활용합니다
  • 프론트엔드부터 추론 서버까지 엔드투엔드로 구현합니다
  • 성능 최적화 포인트를 점검합니다
  • 프로덕션 운영을 위한 체크리스트를 정리합니다

프로젝트 개요: AI 어시스턴트 플랫폼

이 장에서는 시리즈 전체에서 학습한 내용을 종합하여, 프로덕션 수준의 AI 어시스턴트 플랫폼을 설계하고 구현합니다.

요구사항

기능설명
AI 채팅LLM 기반 텍스트 대화, 토큰 스트리밍
생성 중단사용자가 AI 응답 중 중단 가능
도구 호출AI가 외부 API/DB를 호출하여 정보 수집
실시간 협업여러 사용자가 같은 대화를 관찰/참여
파일 분석업로드된 문서에 대한 AI 분석
사용량 추적토큰 사용량, 비용 실시간 모니터링

하이브리드 아키텍처 설계

요구사항을 분석하면, 단일 프로토콜로는 모든 요구를 충족할 수 없습니다. 각 기능에 최적의 프로토콜을 매핑합니다.

프로토콜-기능 매핑

기능프로토콜근거
토큰 스트리밍SSE단방향, 무상태 스케일링
생성 중단SSE (AbortController)연결 종료로 충분
실시간 협업WebSocket양방향, 다수 참여자
서비스 간 통신gRPC저지연, 타입 안전
파일 업로드REST표준 멀티파트 업로드
사용량 조회REST단순 요청-응답

프로토콜 선택 의사결정 트리

새로운 기능을 추가할 때, 어떤 프로토콜을 선택해야 할지 판단하는 의사결정 트리입니다.

엔드투엔드 구현

1. SSE 토큰 스트리밍 엔드포인트

app/api/chat/stream/route.ts
typescript
import { NextRequest } from "next/server";
 
export async function POST(request: NextRequest) {
  const { messages, sessionId } = await request.json();
 
  const encoder = new TextEncoder();
 
  const stream = new ReadableStream({
    async start(controller) {
      const tracker = createStreamTracker(sessionId);
 
      try {
        // 이벤트 기록: 스트리밍 시작
        await recordEvent({
          type: "AssistantStreamStarted",
          sessionId,
          timestamp: Date.now(),
        });
 
        // 시맨틱 캐시 확인
        const cached = await semanticCache.get(
          messages[messages.length - 1].content
        );
 
        if (cached) {
          // 캐시 히트: 즉시 응답 (토큰 단위로 분할하여 자연스러운 스트리밍)
          const tokens = splitIntoTokens(cached);
          for (const token of tokens) {
            controller.enqueue(
              encoder.encode(
                `data: ${JSON.stringify({ token })}\n\n`
              )
            );
            await delay(20); // 자연스러운 타이핑 효과
          }
        } else {
          // 캐시 미스: LLM 추론 (gRPC로 추론 서버 호출)
          const inferenceStream =
            await inferenceClient.streamPredict({
              messages,
              maxTokens: 2048,
              temperature: 0.7,
            });
 
          const fullResponse: string[] = [];
 
          for await (const token of inferenceStream) {
            fullResponse.push(token.text);
            tracker.onToken();
 
            controller.enqueue(
              encoder.encode(
                `data: ${JSON.stringify({ token: token.text })}\n\n`
              )
            );
 
            // 도구 호출 감지
            if (token.toolCall) {
              controller.enqueue(
                encoder.encode(
                  `event: tool_call\ndata: ${JSON.stringify(token.toolCall)}\n\n`
                )
              );
 
              // 도구 실행
              const toolResult = await executeToolCall(
                token.toolCall
              );
              controller.enqueue(
                encoder.encode(
                  `event: tool_result\ndata: ${JSON.stringify(toolResult)}\n\n`
                )
              );
            }
          }
 
          // 캐시 저장
          await semanticCache.put(
            messages[messages.length - 1].content,
            fullResponse.join("")
          );
        }
 
        // 완료 이벤트
        controller.enqueue(
          encoder.encode("data: [DONE]\n\n")
        );
        tracker.onComplete("success");
 
        // 이벤트 기록: 스트리밍 완료
        await recordEvent({
          type: "AssistantStreamCompleted",
          sessionId,
          timestamp: Date.now(),
        });
      } catch (error) {
        tracker.onComplete("error");
 
        controller.enqueue(
          encoder.encode(
            `event: error\ndata: ${JSON.stringify({
              message: "스트리밍 중 오류가 발생했습니다",
            })}\n\n`
          )
        );
      } finally {
        controller.close();
      }
    },
  });
 
  return new Response(stream, {
    headers: {
      "Content-Type": "text/event-stream",
      "Cache-Control": "no-cache",
      Connection: "keep-alive",
    },
  });
}

2. WebSocket 협업 서버

ws-server/collaboration.ts
typescript
import { WebSocketServer, WebSocket } from "ws";
import { createClient } from "redis";
 
const pub = createClient();
const sub = createClient();
await pub.connect();
await sub.connect();
 
interface Session {
  id: string;
  participants: Map<string, WebSocket>;
  isStreaming: boolean;
}
 
const sessions = new Map<string, Session>();
 
const wss = new WebSocketServer({ port: 8080 });
 
wss.on("connection", (ws, req) => {
  const sessionId = extractSessionId(req);
  const userId = extractUserId(req);
 
  // 세션 참가
  let session = sessions.get(sessionId);
  if (!session) {
    session = {
      id: sessionId,
      participants: new Map(),
      isStreaming: false,
    };
    sessions.set(sessionId, session);
 
    // Redis 구독: 다른 서버의 메시지 수신
    sub.subscribe(`session:${sessionId}`, (message) => {
      broadcastToSession(sessionId, message);
    });
  }
 
  session.participants.set(userId, ws);
 
  // 참여자 입장 알림
  broadcastToSession(
    sessionId,
    JSON.stringify({
      type: "participant-joined",
      userId,
      participantCount: session.participants.size,
    })
  );
 
  ws.on("message", async (data) => {
    const msg = JSON.parse(data.toString());
 
    switch (msg.type) {
      case "chat-message":
        // 모든 참여자에게 브로드캐스트
        await pub.publish(
          `session:${sessionId}`,
          JSON.stringify({
            type: "user-message",
            userId,
            content: msg.content,
            timestamp: Date.now(),
          })
        );
        break;
 
      case "typing-indicator":
        await pub.publish(
          `session:${sessionId}`,
          JSON.stringify({
            type: "typing",
            userId,
            isTyping: msg.isTyping,
          })
        );
        break;
 
      case "cursor-position":
        // 협업 편집 시 커서 위치 공유
        await pub.publish(
          `session:${sessionId}`,
          JSON.stringify({
            type: "cursor",
            userId,
            position: msg.position,
          })
        );
        break;
    }
  });
 
  ws.on("close", () => {
    session?.participants.delete(userId);
    broadcastToSession(
      sessionId,
      JSON.stringify({
        type: "participant-left",
        userId,
        participantCount: session?.participants.size ?? 0,
      })
    );
 
    // 마지막 참여자가 떠나면 세션 정리
    if (session?.participants.size === 0) {
      sub.unsubscribe(`session:${sessionId}`);
      sessions.delete(sessionId);
    }
  });
});
 
function broadcastToSession(sessionId: string, message: string) {
  const session = sessions.get(sessionId);
  if (!session) return;
 
  for (const [, ws] of session.participants) {
    if (ws.readyState === WebSocket.OPEN) {
      ws.send(message);
    }
  }
}

3. gRPC 추론 라우터

inference_router.py
python
import grpc
from concurrent import futures
import asyncio
from dataclasses import dataclass
 
import inference_pb2
import inference_pb2_grpc
 
 
@dataclass
class BackendHealth:
    address: str
    queue_depth: int
    gpu_utilization: float
    avg_latency_ms: float
    is_healthy: bool
 
 
class InferenceRouter(inference_pb2_grpc.InferenceServiceServicer):
    def __init__(self, backends: list[str]):
        self.backends = {}
        for addr in backends:
            channel = grpc.aio.insecure_channel(addr)
            stub = inference_pb2_grpc.InferenceServiceStub(channel)
            self.backends[addr] = {
                "stub": stub,
                "health": BackendHealth(
                    address=addr,
                    queue_depth=0,
                    gpu_utilization=0.0,
                    avg_latency_ms=0.0,
                    is_healthy=True,
                ),
            }
 
    def _select_backend(self) -> inference_pb2_grpc.InferenceServiceStub:
        """최소 큐 깊이 기반 백엔드 선택"""
        healthy = [
            b for b in self.backends.values()
            if b["health"].is_healthy
        ]
 
        if not healthy:
            raise grpc.RpcError("No healthy backends available")
 
        best = min(healthy, key=lambda b: b["health"].queue_depth)
        return best["stub"]
 
    async def StreamPredict(self, request, context):
        """요청을 최적의 백엔드로 라우팅하고 응답을 중계"""
        backend = self._select_backend()
 
        try:
            async for token in backend.StreamPredict(request):
                # 클라이언트가 취소했는지 확인
                if context.cancelled():
                    return
 
                yield token
 
        except grpc.RpcError as e:
            # 백엔드 장애 시 다른 백엔드로 폴백
            if e.code() == grpc.StatusCode.UNAVAILABLE:
                self._mark_unhealthy(backend)
                fallback = self._select_backend()
                async for token in fallback.StreamPredict(request):
                    yield token
            else:
                raise
 
 
async def serve():
    server = grpc.aio.server(
        futures.ThreadPoolExecutor(max_workers=20),
        options=[
            ("grpc.keepalive_time_ms", 30000),
            ("grpc.keepalive_timeout_ms", 10000),
            ("grpc.max_send_message_length", 100 * 1024 * 1024),
        ],
    )
 
    router = InferenceRouter(
        backends=[
            "gpu-server-1:50051",
            "gpu-server-2:50051",
        ]
    )
 
    inference_pb2_grpc.add_InferenceServiceServicer_to_server(
        router, server
    )
    server.add_insecure_port("[::]:50050")
    await server.start()
    await server.wait_for_termination()

4. React 프론트엔드 통합

components/AIAssistant.tsx
tsx
import { useState, useCallback, useRef, useEffect } from "react";
 
interface Message {
  id: string;
  role: "user" | "assistant";
  content: string;
  isStreaming?: boolean;
  toolCalls?: ToolCall[];
}
 
function AIAssistant({ sessionId }: { sessionId: string }) {
  const [messages, setMessages] = useState<Message[]>([]);
  const [input, setInput] = useState("");
  const [isStreaming, setIsStreaming] = useState(false);
  const abortRef = useRef<AbortController | null>(null);
  const wsRef = useRef<WebSocket | null>(null);
  const [participants, setParticipants] = useState<string[]>([]);
 
  // WebSocket 연결 (협업)
  useEffect(() => {
    const ws = new WebSocket(
      `wss://api.example.com/ws?session=${sessionId}`
    );
 
    ws.onmessage = (event) => {
      const data = JSON.parse(event.data);
      switch (data.type) {
        case "participant-joined":
          setParticipants((prev) => [...prev, data.userId]);
          break;
        case "participant-left":
          setParticipants((prev) =>
            prev.filter((p) => p !== data.userId)
          );
          break;
        case "user-message":
          // 다른 참여자의 메시지
          if (data.userId !== getCurrentUserId()) {
            setMessages((prev) => [
              ...prev,
              {
                id: crypto.randomUUID(),
                role: "user",
                content: data.content,
              },
            ]);
          }
          break;
      }
    };
 
    wsRef.current = ws;
    return () => ws.close();
  }, [sessionId]);
 
  // SSE 스트리밍 채팅
  const sendMessage = useCallback(async () => {
    if (!input.trim() || isStreaming) return;
 
    const userMessage: Message = {
      id: crypto.randomUUID(),
      role: "user",
      content: input,
    };
 
    const assistantMessage: Message = {
      id: crypto.randomUUID(),
      role: "assistant",
      content: "",
      isStreaming: true,
    };
 
    setMessages((prev) => [...prev, userMessage, assistantMessage]);
    setInput("");
    setIsStreaming(true);
 
    // WebSocket으로 다른 참여자에게 알림
    wsRef.current?.send(
      JSON.stringify({ type: "chat-message", content: input })
    );
 
    const controller = new AbortController();
    abortRef.current = controller;
 
    try {
      const response = await fetch("/api/chat/stream", {
        method: "POST",
        headers: { "Content-Type": "application/json" },
        body: JSON.stringify({
          messages: [...messages, userMessage].map((m) => ({
            role: m.role,
            content: m.content,
          })),
          sessionId,
        }),
        signal: controller.signal,
      });
 
      const reader = response.body?.getReader();
      const decoder = new TextDecoder();
 
      if (!reader) return;
 
      let buffer = "";
      let fullContent = "";
 
      while (true) {
        const { done, value } = await reader.read();
        if (done) break;
 
        buffer += decoder.decode(value, { stream: true });
        const events = buffer.split("\n\n");
        buffer = events.pop() ?? "";
 
        for (const event of events) {
          const lines = event.split("\n");
          const eventType =
            lines
              .find((l) => l.startsWith("event: "))
              ?.slice(7) ?? "message";
          const dataLine = lines.find((l) =>
            l.startsWith("data: ")
          );
 
          if (!dataLine) continue;
          const data = dataLine.slice(6);
 
          if (data === "[DONE]") continue;
 
          if (eventType === "message") {
            const parsed = JSON.parse(data);
            fullContent += parsed.token;
            setMessages((prev) =>
              prev.map((m) =>
                m.id === assistantMessage.id
                  ? { ...m, content: fullContent }
                  : m
              )
            );
          } else if (eventType === "tool_call") {
            // 도구 호출 UI 표시
            const toolCall = JSON.parse(data);
            setMessages((prev) =>
              prev.map((m) =>
                m.id === assistantMessage.id
                  ? {
                      ...m,
                      toolCalls: [
                        ...(m.toolCalls ?? []),
                        toolCall,
                      ],
                    }
                  : m
              )
            );
          }
        }
      }
    } catch (error) {
      if (
        error instanceof DOMException &&
        error.name === "AbortError"
      ) {
        // 사용자 중단
      } else {
        console.error("스트리밍 오류:", error);
      }
    } finally {
      setIsStreaming(false);
      setMessages((prev) =>
        prev.map((m) =>
          m.id === assistantMessage.id
            ? { ...m, isStreaming: false }
            : m
        )
      );
    }
  }, [input, isStreaming, messages, sessionId]);
 
  const handleStop = () => {
    abortRef.current?.abort();
  };
 
  return (
    <div className="ai-assistant">
      {/* 참여자 표시 */}
      <div className="participants">
        {participants.length > 0 && (
          <span>{participants.length}명 참여 중</span>
        )}
      </div>
 
      {/* 메시지 목록 */}
      <div className="messages">
        {messages.map((msg) => (
          <div key={msg.id} className={`message ${msg.role}`}>
            <div className="content">
              {msg.content}
              {msg.isStreaming && (
                <span className="cursor-blink">|</span>
              )}
            </div>
            {msg.toolCalls?.map((tc, i) => (
              <div key={i} className="tool-call">
                {tc.name}: {tc.status}
              </div>
            ))}
          </div>
        ))}
      </div>
 
      {/* 입력 영역 */}
      <div className="input-area">
        <input
          value={input}
          onChange={(e) => setInput(e.target.value)}
          onKeyDown={(e) => e.key === "Enter" && sendMessage()}
          placeholder="메시지를 입력하세요"
          disabled={isStreaming}
        />
        {isStreaming ? (
          <button onClick={handleStop}>중단</button>
        ) : (
          <button onClick={sendMessage}>전송</button>
        )}
      </div>
    </div>
  );
}

성능 최적화 체크리스트

영역최적화 포인트기대 효과
TTFT시맨틱 캐싱캐시 히트 시 10-50ms로 단축
TTFTvLLM 스트리밍 입력프리필 시간 30-50% 감소
처리량Continuous BatchingGPU 사용률 85-95%
네트워크HTTP/3 (QUIC) 지원패킷 손실 시 지연 감소
렌더링requestAnimationFrame 배치클라이언트 프레임 드롭 방지
비용레이트 리미팅 + 캐싱API 비용 30-50% 절감
안정성다계층 백프레셔과부하 시 시스템 보호
가용성서킷 브레이커 + 폴백부분 장애 시에도 서비스 유지

프로덕션 운영 체크리스트

배포 전 반드시 확인해야 할 항목들입니다.

네트워크 및 인프라

  • Nginx/ALB에서 proxy_buffering off 설정 확인
  • WebSocket 업그레이드 헤더 중계 설정
  • SSE 연결 타임아웃 충분히 설정 (최소 30분)
  • CDN에서 스트리밍 경로 바이패스 또는 캐시 비활성화
  • CORS 헤더 올바르게 설정
  • TLS/SSL 인증서 유효성 확인

스케일링 및 가용성

  • HPA 설정 (연결 수 기반 + CPU 보조)
  • Pod terminationGracePeriodSeconds 충분히 설정
  • PreStop 훅으로 연결 드레이닝 구현
  • Redis Pub/Sub로 WebSocket 멀티 서버 브로드캐스트
  • 추론 서버 헬스체크 및 자동 장애 격리

안정성 및 보호

  • API 레이트 리미팅 (사용자별/IP별)
  • LLM API 토큰 버킷 (RPM + TPM)
  • 추론 큐 상한선 설정 (바운드 버퍼)
  • 서킷 브레이커 설정 (연속 실패 시 차단)
  • 부하 차단 (우선순위 기반)

모니터링 및 관찰성

  • TTFT / TPOT 히스토그램 메트릭
  • 활성 연결 수 게이지
  • 스트리밍 완료율 / 중단율 카운터
  • 큐 깊이 실시간 모니터링
  • GPU 사용률 대시보드
  • 알림 규칙 설정 (P99 지연, 에러율, 큐 깊이)

이벤트 소싱 및 감사

  • 대화 이벤트 기록 (시작, 토큰, 완료, 중단)
  • 에이전트 행동 추적 (도구 호출, 결정 과정)
  • 사용량 집계 프로젝션
  • 이벤트 보존 정책 설정

보안

  • WebSocket 연결 인증 (토큰 검증)
  • SSE 엔드포인트 인증 (Authorization 헤더)
  • 입력 검증 및 프롬프트 인젝션 방어
  • 레이트 리미팅으로 남용 방지
  • 민감 데이터 마스킹 (로그/이벤트)
Tip

이 체크리스트를 팀의 배포 파이프라인에 통합하세요. 모든 항목이 자동화된 테스트로 검증되는 것이 이상적이지만, 최소한 배포 전 수동 확인이라도 반드시 수행해야 합니다.

시리즈를 마치며

10장에 걸쳐 스트리밍 아키텍처와 실시간 AI 시스템의 전체 스펙트럼을 다루었습니다.

장핵심 학습
1장스트리밍의 필요성, TTFT/TPOT, 프로토콜 생태계
2장SSE — LLM 스트리밍의 사실상 표준, 무상태의 힘
3장WebSocket — 양방향 통신, 스케일링의 복잡성
4장gRPC — 백엔드 고성능 통신, Protobuf
5장LLM API 차이 통합, 파셜 JSON 파싱, Vercel AI SDK
6장추론 파이프라인, vLLM, 시맨틱 캐싱, 멀티모달
7장이벤트 소싱, CQRS, 시간 여행 디버깅
8장백프레셔, 토큰 버킷, 다계층 흐름 제어
9장프로덕션 인프라, Kubernetes, HTTP/3
10장하이브리드 아키텍처, 엔드투엔드 구현

정리

이번 장에서는 시리즈 전체의 학습 내용을 종합하여 프로덕션 수준의 스트리밍 AI 시스템을 설계하고 구현했습니다.

  • SSE(토큰 스트리밍) + WebSocket(실시간 협업) + gRPC(서비스 간 통신)의 하이브리드 접근이 실무에서 가장 효과적입니다
  • 프로토콜 선택은 의사결정 트리를 따라 체계적으로 판단합니다
  • 프론트엔드에서 추론 서버까지 엔드투엔드 스트리밍 파이프라인을 구성합니다
  • 시맨틱 캐싱, Continuous Batching, 스트리밍 입력으로 성능을 최적화합니다
  • 백프레셔, 서킷 브레이커, 부하 차단으로 시스템 안정성을 확보합니다
  • 이벤트 소싱으로 모든 상호작용을 추적하고 감사합니다

스트리밍은 단순한 기술적 선택이 아니라, AI 시대의 사용자 경험을 좌우하는 아키텍처 결정입니다. 이 시리즈가 여러분의 실시간 AI 시스템 구축에 실질적인 도움이 되었기를 바랍니다.

이 글이 도움이 되셨나요?

관련 주제 더 보기

#streaming#ai

관련 글

아키텍처

9장: 프로덕션 스트리밍 인프라

로드밸런서의 WebSocket 업그레이드, CDN과 스트리밍, Kubernetes에서의 스트리밍 서비스 운영, 모니터링 전략, HTTP/3(QUIC)과 WebTransport의 미래를 다룹니다.

2026년 4월 3일·17분
아키텍처

8장: 백프레셔와 흐름 제어

생산자-소비자 속도 불일치를 관리하는 백프레셔의 원리, 버퍼링/드롭/속도 제한 전략, LLM API 레이트 리미팅, 토큰 버킷 알고리즘, 큐 깊이 모니터링을 다룹니다.

2026년 4월 1일·18분
아키텍처

7장: 이벤트 소싱과 CQRS 패턴

이벤트 소싱과 CQRS 패턴의 원리를 살펴보고, AI 시스템에서의 적용 사례를 다룹니다. 대화 이력 관리, 에이전트 상태 추적, 시간 여행 디버깅, Kafka와 EventStoreDB 활용을 포함합니다.

2026년 3월 30일·15분
이전 글9장: 프로덕션 스트리밍 인프라

댓글

목차

약 21분 남음
  • 학습 목표
  • 프로젝트 개요: AI 어시스턴트 플랫폼
    • 요구사항
  • 하이브리드 아키텍처 설계
    • 프로토콜-기능 매핑
  • 프로토콜 선택 의사결정 트리
  • 엔드투엔드 구현
    • 1. SSE 토큰 스트리밍 엔드포인트
    • 2. WebSocket 협업 서버
    • 3. gRPC 추론 라우터
    • 4. React 프론트엔드 통합
  • 성능 최적화 체크리스트
  • 프로덕션 운영 체크리스트
    • 네트워크 및 인프라
    • 스케일링 및 가용성
    • 안정성 및 보호
    • 모니터링 및 관찰성
    • 이벤트 소싱 및 감사
    • 보안
  • 시리즈를 마치며
  • 정리