이벤트 소싱과 CQRS 패턴의 원리를 살펴보고, AI 시스템에서의 적용 사례를 다룹니다. 대화 이력 관리, 에이전트 상태 추적, 시간 여행 디버깅, Kafka와 EventStoreDB 활용을 포함합니다.
지금까지의 장에서는 데이터가 "흐르는 방식"(프로토콜, 스트리밍)에 집중했습니다. 이번 장에서는 데이터를 "기록하는 방식"으로 시야를 넓힙니다.
**이벤트 소싱(Event Sourcing)**은 시스템의 상태를 직접 저장하는 대신, 상태를 변경시킨 모든 이벤트를 순서대로 기록하는 패턴입니다.
[전통적 CRUD]
사용자 테이블: {id: 1, name: "홍길동", email: "hong@mail.com"}
→ 이메일 변경 → {id: 1, name: "홍길동", email: "new@mail.com"}
→ 이전 이메일은 사라짐
[이벤트 소싱]
이벤트 1: UserCreated {name: "홍길동", email: "hong@mail.com"}
이벤트 2: EmailChanged {oldEmail: "hong@mail.com", newEmail: "new@mail.com"}
→ 모든 변경 이력이 보존됨핵심 원칙은 다음과 같습니다.
MessageSent, TokenGenerated 처럼 이미 일어난 사실을 기록합니다AI 시스템은 이벤트 소싱의 이점이 극대화되는 영역입니다. 그 이유를 구체적으로 살펴보겠습니다.
AI 챗봇의 대화를 이벤트로 기록하면, 단순한 메시지 로그를 넘어선 풍부한 맥락을 보존할 수 있습니다.
// 대화 이벤트 타입 정의
type ConversationEvent =
| {
type: "ConversationStarted";
timestamp: number;
userId: string;
sessionId: string;
model: string;
}
| {
type: "UserMessageSent";
timestamp: number;
content: string;
attachments?: string[];
}
| {
type: "AssistantStreamStarted";
timestamp: number;
model: string;
promptTokens: number;
}
| {
type: "AssistantTokenGenerated";
timestamp: number;
token: string;
index: number;
}
| {
type: "AssistantStreamCompleted";
timestamp: number;
totalTokens: number;
finishReason: string;
latencyMs: number;
}
| {
type: "GenerationCancelled";
timestamp: number;
tokensGenerated: number;
reason: "user" | "timeout" | "error";
}
| {
type: "FeedbackGiven";
timestamp: number;
messageIndex: number;
rating: "positive" | "negative";
comment?: string;
}
| {
type: "ToolCallRequested";
timestamp: number;
toolName: string;
arguments: Record<string, unknown>;
}
| {
type: "ToolCallCompleted";
timestamp: number;
toolName: string;
result: unknown;
durationMs: number;
};이벤트 스트림을 재생하여 대화의 현재 상태를 복원하는 함수입니다.
interface ConversationState {
sessionId: string;
messages: Array<{
role: "user" | "assistant";
content: string;
metadata: Record<string, unknown>;
}>;
isStreaming: boolean;
currentStreamBuffer: string;
totalTokensUsed: number;
toolCalls: Array<{
name: string;
status: "pending" | "completed";
}>;
}
function projectConversation(
events: ConversationEvent[]
): ConversationState {
const state: ConversationState = {
sessionId: "",
messages: [],
isStreaming: false,
currentStreamBuffer: "",
totalTokensUsed: 0,
toolCalls: [],
};
for (const event of events) {
switch (event.type) {
case "ConversationStarted":
state.sessionId = event.sessionId;
break;
case "UserMessageSent":
state.messages.push({
role: "user",
content: event.content,
metadata: { attachments: event.attachments },
});
break;
case "AssistantStreamStarted":
state.isStreaming = true;
state.currentStreamBuffer = "";
break;
case "AssistantTokenGenerated":
state.currentStreamBuffer += event.token;
break;
case "AssistantStreamCompleted":
state.isStreaming = false;
state.messages.push({
role: "assistant",
content: state.currentStreamBuffer,
metadata: {
tokens: event.totalTokens,
latencyMs: event.latencyMs,
},
});
state.currentStreamBuffer = "";
state.totalTokensUsed += event.totalTokens;
break;
case "GenerationCancelled":
state.isStreaming = false;
if (state.currentStreamBuffer) {
state.messages.push({
role: "assistant",
content: state.currentStreamBuffer + " [중단됨]",
metadata: { cancelled: true },
});
}
state.currentStreamBuffer = "";
break;
case "ToolCallRequested":
state.toolCalls.push({
name: event.toolName,
status: "pending",
});
break;
case "ToolCallCompleted": {
const call = state.toolCalls.find(
(c) =>
c.name === event.toolName && c.status === "pending"
);
if (call) call.status = "completed";
break;
}
}
}
return state;
}**CQRS(Command Query Responsibility Segregation, 명령 쿼리 책임 분리)**는 데이터의 쓰기(명령)와 읽기(쿼리)를 서로 다른 모델로 분리하는 패턴입니다. 이벤트 소싱과 자주 함께 사용됩니다.
AI 시스템에서 CQRS가 특히 유용한 이유는 읽기와 쓰기의 패턴이 극단적으로 다르기 때문입니다.
| 명령 (쓰기) | 쿼리 (읽기) |
|---|---|
| 토큰 하나 생성 (초당 수십 회) | 전체 대화 이력 조회 |
| 도구 호출 결과 기록 | 사용량 통계 집계 |
| 피드백 저장 | 유사 대화 검색 |
| 에이전트 상태 변경 | 에이전트 행동 분석 |
쓰기는 고빈도 단순 연산이고, 읽기는 저빈도 복잡 연산입니다. CQRS는 이 두 가지를 독립적으로 최적화할 수 있게 합니다.
AI 에이전트가 여러 단계를 거쳐 작업을 수행할 때, 이벤트 소싱으로 각 단계를 기록하면 디버깅과 재현이 가능해집니다.
type AgentEvent =
| {
type: "TaskReceived";
timestamp: number;
taskDescription: string;
constraints: string[];
}
| {
type: "PlanCreated";
timestamp: number;
steps: string[];
estimatedDuration: number;
}
| {
type: "StepStarted";
timestamp: number;
stepIndex: number;
description: string;
}
| {
type: "ToolInvoked";
timestamp: number;
tool: string;
input: Record<string, unknown>;
}
| {
type: "ToolResultReceived";
timestamp: number;
tool: string;
output: unknown;
success: boolean;
}
| {
type: "ReplanningTriggered";
timestamp: number;
reason: string;
newSteps: string[];
}
| {
type: "TaskCompleted";
timestamp: number;
result: unknown;
totalSteps: number;
totalDurationMs: number;
};
// 에이전트 실행 추적기
class AgentTracer {
private events: AgentEvent[] = [];
private eventStore: EventStore;
constructor(
private agentId: string,
eventStore: EventStore
) {
this.eventStore = eventStore;
}
async record(event: AgentEvent) {
this.events.push(event);
await this.eventStore.append(
`agent-${this.agentId}`,
event
);
}
// 특정 시점의 에이전트 상태 조회
getStateAt(timestamp: number): AgentState {
const relevantEvents = this.events.filter(
(e) => e.timestamp <= timestamp
);
return projectAgentState(relevantEvents);
}
}이벤트 소싱의 가장 강력한 이점 중 하나는 **시간 여행 디버깅(Time-Travel Debugging)**입니다. 모든 상태 변경이 이벤트로 기록되어 있으므로, 임의의 시점으로 되돌아가 당시의 시스템 상태를 정확히 재현할 수 있습니다.
class TimeTravelDebugger {
constructor(private eventStore: EventStore) {}
/**
* 특정 시점의 대화 상태를 재현합니다.
* AI가 왜 특정 응답을 했는지 분석할 때 사용합니다.
*/
async replayTo(
streamId: string,
targetTimestamp: number
): Promise<ConversationState> {
const allEvents = await this.eventStore.readStream(streamId);
// 목표 시점까지의 이벤트만 선택
const eventsUntil = allEvents.filter(
(e) => e.timestamp <= targetTimestamp
);
// 이벤트를 순서대로 재생하여 상태 복원
return projectConversation(eventsUntil);
}
/**
* 두 시점 사이에 발생한 이벤트를 조회합니다.
* "이 시간대에 무슨 일이 있었는지" 분석할 때 사용합니다.
*/
async getEventsBetween(
streamId: string,
from: number,
to: number
): Promise<ConversationEvent[]> {
const allEvents = await this.eventStore.readStream(streamId);
return allEvents.filter(
(e) => e.timestamp >= from && e.timestamp <= to
);
}
/**
* 에이전트의 의사결정 과정을 시각화합니다.
*/
async traceDecision(
agentId: string,
decisionTimestamp: number
): Promise<DecisionTrace> {
const events = await this.eventStore.readStream(
`agent-${agentId}`
);
// 해당 결정 직전의 컨텍스트 수집
const contextEvents = events.filter(
(e) => e.timestamp < decisionTimestamp
);
// 결정 이후의 결과 수집
const outcomeEvents = events.filter(
(e) =>
e.timestamp >= decisionTimestamp &&
e.timestamp < decisionTimestamp + 60000
);
return {
context: contextEvents,
decision: events.find(
(e) => e.timestamp === decisionTimestamp
),
outcomes: outcomeEvents,
};
}
}시간 여행 디버깅은 AI 안전성 감사에 핵심적입니다. "AI가 왜 이런 응답을 했는가?"라는 질문에 답하려면, 응답 시점의 정확한 컨텍스트(이전 메시지, 도구 호출 결과, 시스템 상태)를 재현할 수 있어야 합니다.
이벤트 소싱 시스템의 저장소로 주로 사용되는 두 기술을 비교합니다.
대규모 이벤트 스트리밍 플랫폼으로, 높은 처리량과 수평 확장이 강점입니다.
conversations-events — 대화 이벤트 (파티션: sessionId 기준)
agent-events — 에이전트 실행 이벤트
inference-metrics — 추론 성능 메트릭
feedback-events — 사용자 피드백이벤트 소싱에 특화된 데이터베이스로, 스트림 단위 읽기와 프로젝션 기능이 강점입니다.
| 기준 | Kafka | EventStoreDB |
|---|---|---|
| 주요 강점 | 처리량, 수평 확장 | 이벤트 소싱 네이티브 |
| 스트림 읽기 | 토픽/파티션 단위 | 개별 스트림 단위 |
| 프로젝션 | 외부 처리 필요 | 내장 프로젝션 엔진 |
| 이벤트 순서 | 파티션 내 보장 | 스트림 내 보장 |
| 운영 복잡도 | 높음 (ZooKeeper/KRaft) | 낮음 |
| 생태계 | 매우 풍부 | 상대적으로 작음 |
| 적합 규모 | 대규모 (일 수십억 이벤트) | 중소규모 (일 수백만 이벤트) |
규모와 이미 Kafka를 사용 중인지 여부가 선택의 핵심 기준입니다. 기존에 Kafka 인프라가 있다면 이벤트 소싱 용도로도 활용하는 것이 운영 부담을 줄입니다. 새로 시작하는 소규모 시스템이라면 EventStoreDB가 더 적합할 수 있습니다.
이벤트 소싱은 자연스럽게 완벽한 감사 로그를 생성합니다. AI 시스템에서 이는 규제 준수와 신뢰성의 기반입니다.
// 감사 로그 쿼리 예시
// 1. 특정 사용자의 모든 AI 상호작용 조회
const userInteractions = await eventStore.readStream(
`user-${userId}`
);
// 2. 특정 기간의 생성 중단 이벤트 집계
const cancellations = await eventStore.query({
eventType: "GenerationCancelled",
fromTimestamp: startOfDay,
toTimestamp: endOfDay,
});
// 3. 도구 호출 실패 패턴 분석
const failedToolCalls = await eventStore.query({
eventType: "ToolResultReceived",
filter: { success: false },
fromTimestamp: lastWeek,
});
// 4. 특정 모델 버전의 응답 품질 추적
const modelResponses = await eventStore.query({
eventType: "AssistantStreamCompleted",
metadata: { model: "claude-sonnet-4-20250514" },
});이번 장에서는 이벤트 소싱과 CQRS 패턴이 AI 시스템에서 어떤 가치를 제공하는지 살펴보았습니다.
다음 장에서는 스트리밍 시스템의 안정성을 좌우하는 **백프레셔(Backpressure)**와 흐름 제어 패턴을 다룹니다. 생산자와 소비자의 속도 불일치를 어떻게 관리하고, LLM API의 레이트 리미팅에 어떻게 대응하는지 살펴보겠습니다.
이 글이 도움이 되셨나요?
관련 주제 더 보기
생산자-소비자 속도 불일치를 관리하는 백프레셔의 원리, 버퍼링/드롭/속도 제한 전략, LLM API 레이트 리미팅, 토큰 버킷 알고리즘, 큐 깊이 모니터링을 다룹니다.
vLLM의 스트리밍 입력, Continuous Batching, 시맨틱 캐싱, 추론 라우터, 멀티모달 실시간 처리 등 백엔드 추론 파이프라인의 핵심 아키텍처를 다룹니다.
로드밸런서의 WebSocket 업그레이드, CDN과 스트리밍, Kubernetes에서의 스트리밍 서비스 운영, 모니터링 전략, HTTP/3(QUIC)과 WebTransport의 미래를 다룹니다.