생산자-소비자 속도 불일치를 관리하는 백프레셔의 원리, 버퍼링/드롭/속도 제한 전략, LLM API 레이트 리미팅, 토큰 버킷 알고리즘, 큐 깊이 모니터링을 다룹니다.
물이 파이프를 통해 흐를 때, 하류에서 흐름이 막히면 상류에 압력이 가해집니다. 이것이 **백프레셔(Backpressure, 역압)**입니다. 소프트웨어 시스템에서도 동일한 현상이 발생합니다.
[정상 상태]
생산자 (100 req/s) ──> 큐 ──> 소비자 (120 req/s)
[여유]
[백프레셔 발생]
생산자 (100 req/s) ──> 큐 ──> 소비자 (50 req/s)
[넘침!]AI 시스템에서 백프레셔는 매우 흔하게 발생합니다. 그 이유는 다음과 같습니다.
백프레셔를 관리하지 않으면 메모리 폭주, 타임아웃, 전체 시스템 장애로 이어집니다.
AI 스트리밍 시스템에서 발생하는 대표적인 속도 불일치 시나리오를 살펴보겠습니다.
GPU 서버가 토큰을 빠르게 생성하지만, 네트워크 전송이나 클라이언트 렌더링이 따라가지 못하는 경우입니다.
갑작스러운 트래픽 스파이크로 추론 큐가 넘치는 경우입니다.
시간 | 요청 유입 | 처리 완료 | 큐 깊이
12:00 | 10 | 10 | 0
12:01 | 50 | 10 | 40
12:02 | 100 | 10 | 130
12:03 | 200 | 10 | 320 ← 위험!외부 LLM API(OpenAI, Anthropic 등)가 분당 요청 수를 제한하는 경우입니다. 429 응답이 반복되면 서비스 품질이 급격히 저하됩니다.
백프레셔에 대응하는 전략은 크게 네 가지로 나뉩니다.
속도 차이를 큐(버퍼)로 흡수하는 가장 기본적인 전략입니다.
class BoundedBuffer<T> {
private buffer: T[] = [];
private readonly maxSize: number;
private readonly highWaterMark: number;
private readonly lowWaterMark: number;
private isPaused = false;
constructor(options: {
maxSize: number;
highWaterMark?: number;
lowWaterMark?: number;
}) {
this.maxSize = options.maxSize;
this.highWaterMark = options.highWaterMark ?? options.maxSize * 0.8;
this.lowWaterMark = options.lowWaterMark ?? options.maxSize * 0.2;
}
push(item: T): "ok" | "pause" | "reject" {
if (this.buffer.length >= this.maxSize) {
return "reject"; // 버퍼 가득 참
}
this.buffer.push(item);
if (this.buffer.length >= this.highWaterMark) {
this.isPaused = true;
return "pause"; // 생산자에게 속도 줄이라는 신호
}
return "ok";
}
pull(): T | undefined {
const item = this.buffer.shift();
if (
this.isPaused &&
this.buffer.length <= this.lowWaterMark
) {
this.isPaused = false;
// 생산자에게 다시 보내도 된다는 신호
}
return item;
}
get depth(): number {
return this.buffer.length;
}
}무한 버퍼는 결국 메모리를 소진시킵니다. 반드시 상한선을 설정하고, 상한에 도달했을 때의 행동(거부, 드롭, 대기)을 명시적으로 정의해야 합니다.
처리할 수 없는 데이터를 폐기하는 전략입니다. 실시간 비디오 분석처럼 최신 데이터가 중요한 경우에 적합합니다.
type DropStrategy = "drop-oldest" | "drop-newest" | "drop-random";
class DroppingBuffer<T> {
private buffer: T[] = [];
private dropped = 0;
constructor(
private maxSize: number,
private strategy: DropStrategy
) {}
push(item: T) {
if (this.buffer.length < this.maxSize) {
this.buffer.push(item);
return;
}
// 버퍼가 가득 찬 경우
this.dropped++;
switch (this.strategy) {
case "drop-oldest":
// 가장 오래된 항목 제거, 새 항목 추가
this.buffer.shift();
this.buffer.push(item);
break;
case "drop-newest":
// 새 항목을 폐기 (버퍼 유지)
break;
case "drop-random":
// 랜덤 위치의 항목을 교체
const idx = Math.floor(Math.random() * this.maxSize);
this.buffer[idx] = item;
break;
}
}
getDropCount(): number {
return this.dropped;
}
}생산자의 속도를 직접 제한하는 전략입니다.
시스템이 한계에 도달했을 때 새로운 요청을 거부하는 전략입니다.
class LoadShedder {
private currentLoad = 0;
private readonly capacity: number;
private readonly shedThreshold: number;
constructor(capacity: number, shedThreshold = 0.9) {
this.capacity = capacity;
this.shedThreshold = shedThreshold;
}
shouldAccept(priority: "low" | "normal" | "high"): boolean {
const utilization = this.currentLoad / this.capacity;
// 여유 있으면 모두 수용
if (utilization < this.shedThreshold) return true;
// 과부하 시 우선순위에 따라 차단
switch (priority) {
case "high":
return utilization < 0.98; // 98%까지 수용
case "normal":
return utilization < 0.95; // 95%까지 수용
case "low":
return false; // 즉시 차단
}
}
async execute<T>(
priority: "low" | "normal" | "high",
fn: () => Promise<T>
): Promise<T> {
if (!this.shouldAccept(priority)) {
throw new Error("Service overloaded. Please retry later.");
}
this.currentLoad++;
try {
return await fn();
} finally {
this.currentLoad--;
}
}
}**토큰 버킷(Token Bucket)**은 레이트 리미팅에서 가장 널리 사용되는 알고리즘입니다. LLM API 호출 제한에 특히 적합합니다.
원리는 간단합니다.
class TokenBucket {
private tokens: number;
private lastRefill: number;
constructor(
private readonly capacity: number,
private readonly refillRate: number // tokens per second
) {
this.tokens = capacity;
this.lastRefill = Date.now();
}
private refill() {
const now = Date.now();
const elapsed = (now - this.lastRefill) / 1000;
this.tokens = Math.min(
this.capacity,
this.tokens + elapsed * this.refillRate
);
this.lastRefill = now;
}
tryConsume(count: number = 1): boolean {
this.refill();
if (this.tokens >= count) {
this.tokens -= count;
return true;
}
return false;
}
async waitAndConsume(count: number = 1): Promise<void> {
while (!this.tryConsume(count)) {
// 필요한 토큰이 채워질 때까지 대기
const deficit = count - this.tokens;
const waitMs = (deficit / this.refillRate) * 1000;
await new Promise((resolve) =>
setTimeout(resolve, Math.max(waitMs, 10))
);
}
}
get availableTokens(): number {
this.refill();
return Math.floor(this.tokens);
}
}실제 LLM API에는 분당 요청 수(RPM)와 분당 토큰 수(TPM) 두 가지 제한이 있습니다. 이를 동시에 관리하는 레이트 리미터입니다.
class LLMRateLimiter {
private requestBucket: TokenBucket;
private tokenBucket: TokenBucket;
private queue: Array<{
resolve: () => void;
tokensNeeded: number;
}> = [];
constructor(config: {
requestsPerMinute: number;
tokensPerMinute: number;
burstMultiplier?: number;
}) {
const burst = config.burstMultiplier ?? 1.2;
this.requestBucket = new TokenBucket(
Math.ceil(config.requestsPerMinute * burst),
config.requestsPerMinute / 60
);
this.tokenBucket = new TokenBucket(
Math.ceil(config.tokensPerMinute * burst),
config.tokensPerMinute / 60
);
}
async acquire(estimatedTokens: number): Promise<void> {
// 요청 토큰 소비
await this.requestBucket.waitAndConsume(1);
// 토큰 소비 (입력 토큰 기준 추정)
await this.tokenBucket.waitAndConsume(estimatedTokens);
}
reportActualUsage(actualTokens: number, estimatedTokens: number) {
// 추정치와 실제 사용량의 차이를 보정
const diff = actualTokens - estimatedTokens;
if (diff > 0) {
// 실제가 더 많았으면 추가 소비
this.tokenBucket.tryConsume(diff);
}
// 실제가 더 적었으면 자연 보충에 맡김
}
getStatus(): {
availableRequests: number;
availableTokens: number;
} {
return {
availableRequests: this.requestBucket.availableTokens,
availableTokens: this.tokenBucket.availableTokens,
};
}
}LLM API의 토큰 사용량은 사전에 정확히 알 수 없습니다. 프롬프트의 토큰 수는 추정 가능하지만, 응답 토큰 수는 완료되어야 확인됩니다. reportActualUsage로 사후 보정하는 패턴이 실용적입니다.
백프레셔를 효과적으로 관리하려면, 시스템의 각 지점에서 큐 깊이를 실시간으로 모니터링해야 합니다.
interface QueueMetrics {
depth: number;
enqueueRate: number; // items/sec
dequeueRate: number; // items/sec
avgWaitTime: number; // ms
maxWaitTime: number; // ms
dropCount: number;
}
class QueueMonitor {
private metrics: Map<string, QueueMetrics> = new Map();
private alerts: Array<{
queueName: string;
condition: (m: QueueMetrics) => boolean;
action: (m: QueueMetrics) => void;
}> = [];
registerAlert(
queueName: string,
condition: (m: QueueMetrics) => boolean,
action: (m: QueueMetrics) => void
) {
this.alerts.push({ queueName, condition, action });
}
update(queueName: string, metrics: QueueMetrics) {
this.metrics.set(queueName, metrics);
// 알림 조건 확인
for (const alert of this.alerts) {
if (
alert.queueName === queueName &&
alert.condition(metrics)
) {
alert.action(metrics);
}
}
}
}
// 사용 예시
const monitor = new QueueMonitor();
// 추론 큐 깊이가 100을 초과하면 경고
monitor.registerAlert(
"inference-queue",
(m) => m.depth > 100,
(m) => {
console.warn(
`추론 큐 깊이 경고: ${m.depth}, 평균 대기: ${m.avgWaitTime}ms`
);
// 자동 스케일 아웃 트리거
triggerAutoScale("inference-servers");
}
);
// 평균 대기 시간이 10초를 초과하면 부하 차단 활성화
monitor.registerAlert(
"inference-queue",
(m) => m.avgWaitTime > 10000,
() => {
enableLoadShedding("low-priority");
}
);프로덕션 AI 시스템에서는 백프레셔가 여러 계층에서 동시에 작동해야 합니다.
각 계층의 역할은 다음과 같습니다.
| 계층 | 역할 | 전략 |
|---|---|---|
| API Gateway | 클라이언트별 요청 제한 | 토큰 버킷 + IP 기반 제한 |
| 추론 큐 | 요청 버퍼링과 우선순위 지정 | 바운드 버퍼 + 우선순위 큐 |
| 모델 서버 | GPU 자원 보호 | 동시 요청 수 제한 |
| 외부 API | API 제공자 제한 준수 | RPM/TPM 토큰 버킷 |
백프레셔 신호는 하류에서 상류로 전파되어야 합니다. 외부 API가 429를 반환하면, 그 신호가 모델 서버 to 추론 큐 to API Gateway까지 올라가서 새 요청의 유입을 제한해야 합니다. 이 전파가 끊어지면 중간 큐에 요청이 쌓여 메모리 폭주가 발생합니다.
class OverloadProtection {
private rateLimiter: LLMRateLimiter;
private loadShedder: LoadShedder;
private queueMonitor: QueueMonitor;
private circuitBreaker: CircuitBreaker;
async handleRequest(
request: InferenceRequest
): Promise<InferenceResponse> {
// 1. 서킷 브레이커 확인
if (this.circuitBreaker.isOpen()) {
throw new Error("Service temporarily unavailable");
}
// 2. 부하 차단 확인
if (!this.loadShedder.shouldAccept(request.priority)) {
throw new Error("Server overloaded, try again later");
}
// 3. 레이트 리미팅
await this.rateLimiter.acquire(request.estimatedTokens);
try {
const response = await this.executeInference(request);
this.circuitBreaker.recordSuccess();
return response;
} catch (error) {
this.circuitBreaker.recordFailure();
throw error;
}
}
}
// 서킷 브레이커: 연속 실패 시 일정 시간 요청 차단
class CircuitBreaker {
private failures = 0;
private lastFailure = 0;
private state: "closed" | "open" | "half-open" = "closed";
constructor(
private threshold: number = 5,
private resetTimeMs: number = 30000
) {}
isOpen(): boolean {
if (this.state === "open") {
// 리셋 시간이 지났으면 half-open으로 전환
if (Date.now() - this.lastFailure > this.resetTimeMs) {
this.state = "half-open";
return false;
}
return true;
}
return false;
}
recordSuccess() {
this.failures = 0;
this.state = "closed";
}
recordFailure() {
this.failures++;
this.lastFailure = Date.now();
if (this.failures >= this.threshold) {
this.state = "open";
}
}
}이번 장에서는 스트리밍 시스템의 안정성을 좌우하는 백프레셔와 흐름 제어를 살펴보았습니다.
다음 장에서는 이 모든 것을 프로덕션에 배포하기 위한 인프라 구성을 다룹니다. 로드밸런서의 WebSocket 업그레이드 처리, Kubernetes에서의 스트리밍 서비스 운영, HTTP/3와 WebTransport의 미래 전망을 살펴보겠습니다.
이 글이 도움이 되셨나요?
관련 주제 더 보기
로드밸런서의 WebSocket 업그레이드, CDN과 스트리밍, Kubernetes에서의 스트리밍 서비스 운영, 모니터링 전략, HTTP/3(QUIC)과 WebTransport의 미래를 다룹니다.
이벤트 소싱과 CQRS 패턴의 원리를 살펴보고, AI 시스템에서의 적용 사례를 다룹니다. 대화 이력 관리, 에이전트 상태 추적, 시간 여행 디버깅, Kafka와 EventStoreDB 활용을 포함합니다.
SSE, gRPC, WebSocket을 결합한 하이브리드 스트리밍 AI 시스템을 설계하고 구현합니다. 프로토콜 선택 의사결정 트리, 엔드투엔드 구현, 성능 최적화, 운영 체크리스트를 다룹니다.