vLLM의 스트리밍 입력, Continuous Batching, 시맨틱 캐싱, 추론 라우터, 멀티모달 실시간 처리 등 백엔드 추론 파이프라인의 핵심 아키텍처를 다룹니다.
지금까지 프로토콜과 클라이언트 처리를 다루었습니다. 이번 장에서는 "LLM이 토큰을 생성하는 바로 그 지점" — 추론 서버의 내부 아키텍처를 살펴봅니다.
각 구성요소가 스트리밍 아키텍처에서 어떤 역할을 하는지 하나씩 살펴보겠습니다.
vLLM은 LLM 추론을 위한 고성능 서빙 엔진으로, 2024년 이후 사실상 표준의 위치를 차지하고 있습니다. 2026년 초에 도입된 스트리밍 입력(Streaming Input) 기능은 추론 파이프라인의 패러다임을 바꾸었습니다.
[전체 프롬프트 수신 완료] ──> [프리필 시작] ──> [디코딩 시작] ──> [토큰 출력]
시간: ========= 대기 =========|=== 프리필 ===|=== 디코딩 ====>[프롬프트 청크 1 도착] ──> [부분 프리필 시작]
[프롬프트 청크 2 도착] ──> [프리필 이어서]
[프롬프트 청크 3 도착] ──> [프리필 완료] ──> [디코딩 시작] ──> [토큰 출력]
시간: = 수신+프리필 병렬 =|= 디코딩 ====>스트리밍 입력의 핵심은 프롬프트의 일부분이 도착하는 즉시 프리필(입력 처리)을 시작한다는 것입니다. 긴 시스템 프롬프트나 RAG 컨텍스트를 포함한 요청에서 TTFT를 크게 줄입니다.
from vllm import AsyncLLMEngine, SamplingParams
engine = AsyncLLMEngine.from_engine_args(engine_args)
async def stream_inference(prompt_chunks: AsyncIterator[str]):
"""프롬프트를 점진적으로 수신하면서 프리필 진행"""
sampling_params = SamplingParams(
temperature=0.7,
max_tokens=1024,
stream=True,
)
request_id = generate_request_id()
# 스트리밍 입력: 청크 단위로 추가
async for chunk in prompt_chunks:
await engine.add_request_chunk(
request_id=request_id,
prompt_chunk=chunk,
)
# 입력 완료 신호
await engine.finalize_request(request_id, sampling_params)
# 출력 토큰 스트리밍
async for output in engine.generate_stream(request_id):
yield output.outputs[0].text스트리밍 입력은 특히 음성-텍스트-LLM 파이프라인에서 위력을 발휘합니다. 음성 인식(STT) 결과가 단어 단위로 도착할 때, LLM은 대기하지 않고 즉시 이해를 시작할 수 있습니다.
전통적인 배치 추론은 여러 요청을 모아서 한꺼번에 처리합니다. 문제는 가장 긴 응답이 끝날 때까지 다른 모든 요청이 대기해야 한다는 것입니다.
**Continuous Batching(연속 배칭)**은 이 문제를 해결합니다. 요청이 완료되는 즉시 새 요청을 배치에 추가합니다.
[정적 배칭]
배치 1: [요청A(100토큰), 요청B(10토큰), 요청C(50토큰)]
─── 요청A 완료까지 대기 ───> 배치 2 시작
[연속 배칭]
시점 1: [요청A, 요청B, 요청C] 처리 중
시점 2: 요청B 완료 → [요청A, 요청D, 요청C] (요청D 즉시 추가)
시점 3: 요청C 완료 → [요청A, 요청D, 요청E] (요청E 즉시 추가)Continuous Batching은 스트리밍과 결합될 때 최대 효과를 발휘합니다.
| 지표 | 정적 배칭 | 연속 배칭 | 연속 배칭 + 스트리밍 |
|---|---|---|---|
| GPU 사용률 | 60-70% | 85-95% | 85-95% |
| 평균 대기 시간 | 높음 | 낮음 | 낮음 |
| TTFT | 배치 크기에 비례 | 요청 도착 즉시 | 요청 도착 즉시 |
| 사용자 체감 | 나쁨 | 보통 | 우수 |
GPU 사용률이 높아진다는 것은 같은 하드웨어로 더 많은 요청을 처리할 수 있다는 의미이며, 이는 곧 비용 효율성으로 이어집니다.
**시맨틱 캐싱(Semantic Caching)**은 동일하거나 유사한 쿼리에 대해 이전 응답을 재활용하는 전략입니다. 정확한 문자열 매칭이 아닌, 의미적 유사도를 기반으로 캐시 히트를 판단합니다.
import numpy as np
from redis import Redis
from sentence_transformers import SentenceTransformer
class SemanticCache:
def __init__(
self,
similarity_threshold: float = 0.95,
ttl_seconds: int = 3600,
):
self.model = SentenceTransformer("all-MiniLM-L6-v2")
self.redis = Redis()
self.threshold = similarity_threshold
self.ttl = ttl_seconds
def get(self, query: str) -> str | None:
"""의미적으로 유사한 캐시 엔트리 검색"""
query_embedding = self.model.encode(query)
# Redis에서 벡터 검색 (RediSearch 모듈 활용)
results = self.redis.ft("cache_idx").search(
f"@embedding:[VECTOR_RANGE $radius $vec]",
query_params={
"radius": 1 - self.threshold,
"vec": query_embedding.tobytes(),
},
)
if results.total > 0:
return results.docs[0].response
return None
def put(self, query: str, response: str):
"""쿼리-응답 쌍을 캐시에 저장"""
embedding = self.model.encode(query)
cache_key = f"cache:{hash(query)}"
self.redis.hset(cache_key, mapping={
"query": query,
"response": response,
"embedding": embedding.tobytes(),
})
self.redis.expire(cache_key, self.ttl)| 시나리오 | 캐시 없음 | 시맨틱 캐싱 적용 |
|---|---|---|
| "파이썬에서 리스트 정렬 방법" | 2-5초 | 10-50ms |
| "Python list 정렬하는 법" (유사 쿼리) | 2-5초 | 10-50ms (캐시 히트) |
| 고객 지원 FAQ | 매번 추론 | 80%+ 캐시 히트율 |
시맨틱 캐싱의 유사도 임계값은 신중하게 설정해야 합니다. 너무 낮으면 다른 의도의 쿼리에 잘못된 응답을 반환하고, 너무 높으면 캐시 히트율이 떨어집니다. 도메인별 테스트가 필수입니다.
여러 모델이나 여러 GPU 서버를 운영할 때, 요청을 어디로 보낼지 결정하는 **추론 라우터(Inference Router)**가 필요합니다.
interface InferenceRequest {
model: string;
prompt: string;
maxTokens: number;
priority: "low" | "normal" | "high";
}
interface BackendServer {
id: string;
models: string[];
currentLoad: number; // 0-1
gpuMemoryFree: number; // bytes
avgLatency: number; // ms
queueDepth: number;
}
type RoutingStrategy =
| "least-connections"
| "least-latency"
| "model-affinity"
| "cost-aware";
function routeRequest(
request: InferenceRequest,
backends: BackendServer[],
strategy: RoutingStrategy
): BackendServer {
// 해당 모델을 지원하는 서버만 필터링
const eligible = backends.filter(
(b) => b.models.includes(request.model) && b.currentLoad < 0.95
);
if (eligible.length === 0) {
throw new Error("사용 가능한 추론 서버 없음");
}
switch (strategy) {
case "least-connections":
return eligible.reduce((min, b) =>
b.queueDepth < min.queueDepth ? b : min
);
case "least-latency":
return eligible.reduce((min, b) =>
b.avgLatency < min.avgLatency ? b : min
);
case "model-affinity":
// KV 캐시가 이미 로드된 서버 우선
return (
eligible.find((b) => b.currentLoad > 0) ??
eligible[0]
);
case "cost-aware":
// 이미 실행 중인 서버 우선 (새 서버 기동 비용 회피)
return eligible.reduce((best, b) =>
b.currentLoad > 0 && b.currentLoad < best.currentLoad
? b
: best
);
}
}2026년의 AI 시스템은 텍스트를 넘어 오디오와 비디오를 실시간으로 처리합니다. 이는 스트리밍 아키텍처에 새로운 도전을 제기합니다.
이 파이프라인에서 각 단계는 이전 단계의 출력을 스트리밍으로 받습니다. 전체 음성이 끝나기를 기다리지 않고, 인식되는 단어부터 LLM에 전달하며, LLM의 출력도 생성되는 즉시 TTS에 전달합니다.
import asyncio
from dataclasses import dataclass
@dataclass
class AudioFrame:
data: bytes
sample_rate: int
channels: int
timestamp_ms: int
class RealtimeAudioPipeline:
"""오디오 실시간 처리 파이프라인"""
def __init__(self):
self.vad = VoiceActivityDetector()
self.stt = StreamingSTT()
self.llm = StreamingLLM()
self.tts = StreamingTTS()
async def process(
self,
audio_stream: AsyncIterator[AudioFrame],
) -> AsyncIterator[AudioFrame]:
"""입력 오디오 스트림을 처리하여 응답 오디오 스트림 생성"""
# 1단계: VAD로 음성 구간 감지
speech_segments = self.vad.process_stream(audio_stream)
# 2단계: 음성을 텍스트로 변환 (스트리밍)
text_stream = self.stt.transcribe_stream(speech_segments)
# 3단계: LLM 추론 (스트리밍 입력 + 스트리밍 출력)
response_text = self.llm.generate_stream(text_stream)
# 4단계: 텍스트를 음성으로 변환 (스트리밍)
response_audio = self.tts.synthesize_stream(response_text)
async for frame in response_audio:
yield framePipecat은 실시간 멀티모달 AI 파이프라인을 구축하기 위한 프레임워크입니다. 각 처리 단계를 파이프라인 노드로 추상화하여 조합할 수 있게 합니다.
from pipecat.pipeline import Pipeline
from pipecat.services.openai import OpenAILLMService
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.transports.websocket import WebSocketTransport
# 파이프라인 구성: 각 노드가 스트리밍으로 연결
pipeline = Pipeline([
WebSocketTransport(port=8765), # 오디오 입력
DeepgramSTTService(), # 실시간 음성 인식
OpenAILLMService(model="gpt-4o"), # LLM 추론
ElevenLabsTTSService(), # 음성 합성
WebSocketTransport(port=8765), # 오디오 출력
])
# 파이프라인 실행 — 각 노드 간 데이터가 스트리밍으로 흐름
await pipeline.run()Pipecat의 강점은 노드 교체가 자유롭다는 것입니다. STT를 Deepgram에서 Whisper로, TTS를 ElevenLabs에서 다른 서비스로 바꿔도 파이프라인의 나머지 부분은 영향을 받지 않습니다.
비디오 스트림에서 프레임 단위로 AI 분석을 수행하는 아키텍처입니다.
class VideoAnalysisPipeline:
"""비디오 프레임 실시간 분석 파이프라인"""
def __init__(self, target_fps: int = 5):
self.target_fps = target_fps
self.frame_interval = 1.0 / target_fps
self.vision_model = load_vision_model()
async def analyze_stream(
self,
video_frames: AsyncIterator[VideoFrame],
) -> AsyncIterator[AnalysisResult]:
"""비디오 프레임을 실시간으로 분석"""
last_analysis_time = 0
async for frame in video_frames:
current_time = frame.timestamp_ms / 1000
# 프레임 샘플링: 모든 프레임을 분석하면 GPU 과부하
if current_time - last_analysis_time < self.frame_interval:
continue
last_analysis_time = current_time
# 비동기 분석 — 이전 프레임 분석 완료를 기다리지 않음
result = await self.vision_model.analyze(frame)
yield AnalysisResult(
timestamp=frame.timestamp_ms,
objects=result.detected_objects,
description=result.scene_description,
)이번 장에서는 백엔드 추론 파이프라인의 핵심 구성요소를 살펴보았습니다.
다음 장에서는 아키텍처 패턴 수준으로 시야를 넓혀, **이벤트 소싱(Event Sourcing)**과 CQRS 패턴이 AI 시스템에서 어떻게 활용되는지 살펴보겠습니다. 대화 이력 관리, 에이전트 상태 추적, 시간 여행 디버깅 등의 실용적 패턴을 다룹니다.
이 글이 도움이 되셨나요?
관련 주제 더 보기
이벤트 소싱과 CQRS 패턴의 원리를 살펴보고, AI 시스템에서의 적용 사례를 다룹니다. 대화 이력 관리, 에이전트 상태 추적, 시간 여행 디버깅, Kafka와 EventStoreDB 활용을 포함합니다.
OpenAI, Anthropic, Google의 스트리밍 API 차이를 비교하고, 구조화된 출력의 파셜 파싱, React 스트리밍 UI 렌더링, Vercel AI SDK 활용법을 다룹니다.
생산자-소비자 속도 불일치를 관리하는 백프레셔의 원리, 버퍼링/드롭/속도 제한 전략, LLM API 레이트 리미팅, 토큰 버킷 알고리즘, 큐 깊이 모니터링을 다룹니다.