본문으로 건너뛰기
Kreath Archive
TechProjectsBooksAbout
TechProjectsBooksAbout

내비게이션

  • Tech
  • Projects
  • Books
  • About
  • Tags

카테고리

  • AI / ML
  • 웹 개발
  • 프로그래밍
  • 개발 도구

연결

  • GitHub
  • Email
  • RSS
© 2026 Kreath Archive. All rights reserved.Built with Next.js + MDX
홈TechProjectsBooksAbout
//
  1. 홈
  2. 테크
  3. 7장: 이벤트 소싱과 CQRS 패턴
2026년 3월 30일·아키텍처·

7장: 이벤트 소싱과 CQRS 패턴

이벤트 소싱과 CQRS 패턴의 원리를 살펴보고, AI 시스템에서의 적용 사례를 다룹니다. 대화 이력 관리, 에이전트 상태 추적, 시간 여행 디버깅, Kafka와 EventStoreDB 활용을 포함합니다.

15분667자9개 섹션
streamingai
공유
streaming-ai7 / 10
12345678910
이전6장: 실시간 추론 파이프라인 설계다음8장: 백프레셔와 흐름 제어

학습 목표

  • 이벤트 소싱의 핵심 원칙과 이벤트 스토어의 역할을 이해합니다
  • CQRS(명령/쿼리 분리) 패턴의 구조와 이점을 파악합니다
  • AI 시스템에서 대화 이력과 에이전트 상태를 이벤트로 관리하는 방법을 학습합니다
  • 시간 여행 디버깅과 감사 로깅의 실용적 가치를 파악합니다
  • Kafka와 EventStoreDB의 특성과 선택 기준을 다룹니다

이벤트 소싱이란

지금까지의 장에서는 데이터가 "흐르는 방식"(프로토콜, 스트리밍)에 집중했습니다. 이번 장에서는 데이터를 "기록하는 방식"으로 시야를 넓힙니다.

**이벤트 소싱(Event Sourcing)**은 시스템의 상태를 직접 저장하는 대신, 상태를 변경시킨 모든 이벤트를 순서대로 기록하는 패턴입니다.

전통적 CRUD vs 이벤트 소싱
text
[전통적 CRUD]
사용자 테이블: {id: 1, name: "홍길동", email: "hong@mail.com"}
→ 이메일 변경 → {id: 1, name: "홍길동", email: "new@mail.com"}
→ 이전 이메일은 사라짐
 
[이벤트 소싱]
이벤트 1: UserCreated {name: "홍길동", email: "hong@mail.com"}
이벤트 2: EmailChanged {oldEmail: "hong@mail.com", newEmail: "new@mail.com"}
→ 모든 변경 이력이 보존됨

핵심 원칙은 다음과 같습니다.

  1. 이벤트는 불변 — 한번 기록된 이벤트는 수정되거나 삭제되지 않습니다
  2. 이벤트는 과거형 — MessageSent, TokenGenerated 처럼 이미 일어난 사실을 기록합니다
  3. 현재 상태는 이벤트의 누적 — 이벤트를 처음부터 재생하면 현재 상태를 복원할 수 있습니다

AI 시스템에서의 이벤트 소싱

AI 시스템은 이벤트 소싱의 이점이 극대화되는 영역입니다. 그 이유를 구체적으로 살펴보겠습니다.

대화 이력 관리

AI 챗봇의 대화를 이벤트로 기록하면, 단순한 메시지 로그를 넘어선 풍부한 맥락을 보존할 수 있습니다.

conversation-events.ts
typescript
// 대화 이벤트 타입 정의
type ConversationEvent =
  | {
      type: "ConversationStarted";
      timestamp: number;
      userId: string;
      sessionId: string;
      model: string;
    }
  | {
      type: "UserMessageSent";
      timestamp: number;
      content: string;
      attachments?: string[];
    }
  | {
      type: "AssistantStreamStarted";
      timestamp: number;
      model: string;
      promptTokens: number;
    }
  | {
      type: "AssistantTokenGenerated";
      timestamp: number;
      token: string;
      index: number;
    }
  | {
      type: "AssistantStreamCompleted";
      timestamp: number;
      totalTokens: number;
      finishReason: string;
      latencyMs: number;
    }
  | {
      type: "GenerationCancelled";
      timestamp: number;
      tokensGenerated: number;
      reason: "user" | "timeout" | "error";
    }
  | {
      type: "FeedbackGiven";
      timestamp: number;
      messageIndex: number;
      rating: "positive" | "negative";
      comment?: string;
    }
  | {
      type: "ToolCallRequested";
      timestamp: number;
      toolName: string;
      arguments: Record<string, unknown>;
    }
  | {
      type: "ToolCallCompleted";
      timestamp: number;
      toolName: string;
      result: unknown;
      durationMs: number;
    };

이벤트에서 상태 복원

이벤트 스트림을 재생하여 대화의 현재 상태를 복원하는 함수입니다.

conversation-projection.ts
typescript
interface ConversationState {
  sessionId: string;
  messages: Array<{
    role: "user" | "assistant";
    content: string;
    metadata: Record<string, unknown>;
  }>;
  isStreaming: boolean;
  currentStreamBuffer: string;
  totalTokensUsed: number;
  toolCalls: Array<{
    name: string;
    status: "pending" | "completed";
  }>;
}
 
function projectConversation(
  events: ConversationEvent[]
): ConversationState {
  const state: ConversationState = {
    sessionId: "",
    messages: [],
    isStreaming: false,
    currentStreamBuffer: "",
    totalTokensUsed: 0,
    toolCalls: [],
  };
 
  for (const event of events) {
    switch (event.type) {
      case "ConversationStarted":
        state.sessionId = event.sessionId;
        break;
 
      case "UserMessageSent":
        state.messages.push({
          role: "user",
          content: event.content,
          metadata: { attachments: event.attachments },
        });
        break;
 
      case "AssistantStreamStarted":
        state.isStreaming = true;
        state.currentStreamBuffer = "";
        break;
 
      case "AssistantTokenGenerated":
        state.currentStreamBuffer += event.token;
        break;
 
      case "AssistantStreamCompleted":
        state.isStreaming = false;
        state.messages.push({
          role: "assistant",
          content: state.currentStreamBuffer,
          metadata: {
            tokens: event.totalTokens,
            latencyMs: event.latencyMs,
          },
        });
        state.currentStreamBuffer = "";
        state.totalTokensUsed += event.totalTokens;
        break;
 
      case "GenerationCancelled":
        state.isStreaming = false;
        if (state.currentStreamBuffer) {
          state.messages.push({
            role: "assistant",
            content: state.currentStreamBuffer + " [중단됨]",
            metadata: { cancelled: true },
          });
        }
        state.currentStreamBuffer = "";
        break;
 
      case "ToolCallRequested":
        state.toolCalls.push({
          name: event.toolName,
          status: "pending",
        });
        break;
 
      case "ToolCallCompleted": {
        const call = state.toolCalls.find(
          (c) =>
            c.name === event.toolName && c.status === "pending"
        );
        if (call) call.status = "completed";
        break;
      }
    }
  }
 
  return state;
}

CQRS 패턴

**CQRS(Command Query Responsibility Segregation, 명령 쿼리 책임 분리)**는 데이터의 쓰기(명령)와 읽기(쿼리)를 서로 다른 모델로 분리하는 패턴입니다. 이벤트 소싱과 자주 함께 사용됩니다.

AI 시스템에서의 CQRS

AI 시스템에서 CQRS가 특히 유용한 이유는 읽기와 쓰기의 패턴이 극단적으로 다르기 때문입니다.

명령 (쓰기)쿼리 (읽기)
토큰 하나 생성 (초당 수십 회)전체 대화 이력 조회
도구 호출 결과 기록사용량 통계 집계
피드백 저장유사 대화 검색
에이전트 상태 변경에이전트 행동 분석

쓰기는 고빈도 단순 연산이고, 읽기는 저빈도 복잡 연산입니다. CQRS는 이 두 가지를 독립적으로 최적화할 수 있게 합니다.

에이전트 상태 추적

AI 에이전트가 여러 단계를 거쳐 작업을 수행할 때, 이벤트 소싱으로 각 단계를 기록하면 디버깅과 재현이 가능해집니다.

agent-events.ts
typescript
type AgentEvent =
  | {
      type: "TaskReceived";
      timestamp: number;
      taskDescription: string;
      constraints: string[];
    }
  | {
      type: "PlanCreated";
      timestamp: number;
      steps: string[];
      estimatedDuration: number;
    }
  | {
      type: "StepStarted";
      timestamp: number;
      stepIndex: number;
      description: string;
    }
  | {
      type: "ToolInvoked";
      timestamp: number;
      tool: string;
      input: Record<string, unknown>;
    }
  | {
      type: "ToolResultReceived";
      timestamp: number;
      tool: string;
      output: unknown;
      success: boolean;
    }
  | {
      type: "ReplanningTriggered";
      timestamp: number;
      reason: string;
      newSteps: string[];
    }
  | {
      type: "TaskCompleted";
      timestamp: number;
      result: unknown;
      totalSteps: number;
      totalDurationMs: number;
    };
 
// 에이전트 실행 추적기
class AgentTracer {
  private events: AgentEvent[] = [];
  private eventStore: EventStore;
 
  constructor(
    private agentId: string,
    eventStore: EventStore
  ) {
    this.eventStore = eventStore;
  }
 
  async record(event: AgentEvent) {
    this.events.push(event);
    await this.eventStore.append(
      `agent-${this.agentId}`,
      event
    );
  }
 
  // 특정 시점의 에이전트 상태 조회
  getStateAt(timestamp: number): AgentState {
    const relevantEvents = this.events.filter(
      (e) => e.timestamp <= timestamp
    );
    return projectAgentState(relevantEvents);
  }
}

시간 여행 디버깅

이벤트 소싱의 가장 강력한 이점 중 하나는 **시간 여행 디버깅(Time-Travel Debugging)**입니다. 모든 상태 변경이 이벤트로 기록되어 있으므로, 임의의 시점으로 되돌아가 당시의 시스템 상태를 정확히 재현할 수 있습니다.

time-travel-debugger.ts
typescript
class TimeTravelDebugger {
  constructor(private eventStore: EventStore) {}
 
  /**
   * 특정 시점의 대화 상태를 재현합니다.
   * AI가 왜 특정 응답을 했는지 분석할 때 사용합니다.
   */
  async replayTo(
    streamId: string,
    targetTimestamp: number
  ): Promise<ConversationState> {
    const allEvents = await this.eventStore.readStream(streamId);
 
    // 목표 시점까지의 이벤트만 선택
    const eventsUntil = allEvents.filter(
      (e) => e.timestamp <= targetTimestamp
    );
 
    // 이벤트를 순서대로 재생하여 상태 복원
    return projectConversation(eventsUntil);
  }
 
  /**
   * 두 시점 사이에 발생한 이벤트를 조회합니다.
   * "이 시간대에 무슨 일이 있었는지" 분석할 때 사용합니다.
   */
  async getEventsBetween(
    streamId: string,
    from: number,
    to: number
  ): Promise<ConversationEvent[]> {
    const allEvents = await this.eventStore.readStream(streamId);
    return allEvents.filter(
      (e) => e.timestamp >= from && e.timestamp <= to
    );
  }
 
  /**
   * 에이전트의 의사결정 과정을 시각화합니다.
   */
  async traceDecision(
    agentId: string,
    decisionTimestamp: number
  ): Promise<DecisionTrace> {
    const events = await this.eventStore.readStream(
      `agent-${agentId}`
    );
 
    // 해당 결정 직전의 컨텍스트 수집
    const contextEvents = events.filter(
      (e) => e.timestamp < decisionTimestamp
    );
 
    // 결정 이후의 결과 수집
    const outcomeEvents = events.filter(
      (e) =>
        e.timestamp >= decisionTimestamp &&
        e.timestamp < decisionTimestamp + 60000
    );
 
    return {
      context: contextEvents,
      decision: events.find(
        (e) => e.timestamp === decisionTimestamp
      ),
      outcomes: outcomeEvents,
    };
  }
}
Tip

시간 여행 디버깅은 AI 안전성 감사에 핵심적입니다. "AI가 왜 이런 응답을 했는가?"라는 질문에 답하려면, 응답 시점의 정확한 컨텍스트(이전 메시지, 도구 호출 결과, 시스템 상태)를 재현할 수 있어야 합니다.

Kafka vs EventStoreDB

이벤트 소싱 시스템의 저장소로 주로 사용되는 두 기술을 비교합니다.

Apache Kafka

대규모 이벤트 스트리밍 플랫폼으로, 높은 처리량과 수평 확장이 강점입니다.

Kafka 토픽 구조 (AI 시스템)
text
conversations-events     — 대화 이벤트 (파티션: sessionId 기준)
agent-events             — 에이전트 실행 이벤트
inference-metrics        — 추론 성능 메트릭
feedback-events          — 사용자 피드백

EventStoreDB

이벤트 소싱에 특화된 데이터베이스로, 스트림 단위 읽기와 프로젝션 기능이 강점입니다.

기준KafkaEventStoreDB
주요 강점처리량, 수평 확장이벤트 소싱 네이티브
스트림 읽기토픽/파티션 단위개별 스트림 단위
프로젝션외부 처리 필요내장 프로젝션 엔진
이벤트 순서파티션 내 보장스트림 내 보장
운영 복잡도높음 (ZooKeeper/KRaft)낮음
생태계매우 풍부상대적으로 작음
적합 규모대규모 (일 수십억 이벤트)중소규모 (일 수백만 이벤트)
Info

규모와 이미 Kafka를 사용 중인지 여부가 선택의 핵심 기준입니다. 기존에 Kafka 인프라가 있다면 이벤트 소싱 용도로도 활용하는 것이 운영 부담을 줄입니다. 새로 시작하는 소규모 시스템이라면 EventStoreDB가 더 적합할 수 있습니다.

감사 로깅 (Audit Logging)

이벤트 소싱은 자연스럽게 완벽한 감사 로그를 생성합니다. AI 시스템에서 이는 규제 준수와 신뢰성의 기반입니다.

audit-log-queries.ts
typescript
// 감사 로그 쿼리 예시
 
// 1. 특정 사용자의 모든 AI 상호작용 조회
const userInteractions = await eventStore.readStream(
  `user-${userId}`
);
 
// 2. 특정 기간의 생성 중단 이벤트 집계
const cancellations = await eventStore.query({
  eventType: "GenerationCancelled",
  fromTimestamp: startOfDay,
  toTimestamp: endOfDay,
});
 
// 3. 도구 호출 실패 패턴 분석
const failedToolCalls = await eventStore.query({
  eventType: "ToolResultReceived",
  filter: { success: false },
  fromTimestamp: lastWeek,
});
 
// 4. 특정 모델 버전의 응답 품질 추적
const modelResponses = await eventStore.query({
  eventType: "AssistantStreamCompleted",
  metadata: { model: "claude-sonnet-4-20250514" },
});

정리

이번 장에서는 이벤트 소싱과 CQRS 패턴이 AI 시스템에서 어떤 가치를 제공하는지 살펴보았습니다.

  • 이벤트 소싱은 모든 상태 변경을 불변 이벤트로 기록하여 완벽한 이력을 보존합니다
  • AI 대화의 토큰 생성, 도구 호출, 피드백 등을 이벤트로 모델링하면 풍부한 분석이 가능합니다
  • CQRS는 고빈도 쓰기(토큰 기록)와 저빈도 복잡 읽기(통계 조회)를 독립적으로 최적화합니다
  • 시간 여행 디버깅으로 AI의 의사결정 과정을 정확히 재현하고 분석할 수 있습니다
  • Kafka는 대규모 처리에, EventStoreDB는 이벤트 소싱 네이티브 기능에 강점이 있습니다

다음 장에서는 스트리밍 시스템의 안정성을 좌우하는 **백프레셔(Backpressure)**와 흐름 제어 패턴을 다룹니다. 생산자와 소비자의 속도 불일치를 어떻게 관리하고, LLM API의 레이트 리미팅에 어떻게 대응하는지 살펴보겠습니다.

이 글이 도움이 되셨나요?

관련 주제 더 보기

#streaming#ai

관련 글

아키텍처

8장: 백프레셔와 흐름 제어

생산자-소비자 속도 불일치를 관리하는 백프레셔의 원리, 버퍼링/드롭/속도 제한 전략, LLM API 레이트 리미팅, 토큰 버킷 알고리즘, 큐 깊이 모니터링을 다룹니다.

2026년 4월 1일·18분
아키텍처

6장: 실시간 추론 파이프라인 설계

vLLM의 스트리밍 입력, Continuous Batching, 시맨틱 캐싱, 추론 라우터, 멀티모달 실시간 처리 등 백엔드 추론 파이프라인의 핵심 아키텍처를 다룹니다.

2026년 3월 28일·17분
아키텍처

9장: 프로덕션 스트리밍 인프라

로드밸런서의 WebSocket 업그레이드, CDN과 스트리밍, Kubernetes에서의 스트리밍 서비스 운영, 모니터링 전략, HTTP/3(QUIC)과 WebTransport의 미래를 다룹니다.

2026년 4월 3일·17분
이전 글6장: 실시간 추론 파이프라인 설계
다음 글8장: 백프레셔와 흐름 제어

댓글

목차

약 15분 남음
  • 학습 목표
  • 이벤트 소싱이란
  • AI 시스템에서의 이벤트 소싱
    • 대화 이력 관리
    • 이벤트에서 상태 복원
  • CQRS 패턴
    • AI 시스템에서의 CQRS
  • 에이전트 상태 추적
  • 시간 여행 디버깅
  • Kafka vs EventStoreDB
    • Apache Kafka
    • EventStoreDB
  • 감사 로깅 (Audit Logging)
  • 정리