Mem0와 Zep을 활용한 듀얼 레이어 메모리 시스템 구축, 메모리 압축 파이프라인, 성능 벤치마킹, 프로덕션 운영 체크리스트까지 실전 가이드를 제공합니다.
이번 장에서는 고객 지원 에이전트에 메모리 시스템을 통합하는 실전 프로젝트를 진행합니다. 이 에이전트는 다음을 수행해야 합니다.
7장의 의사결정 트리에 따라 프레임워크를 선택합니다.
요구사항 분석:
- 사용자 선호도 저장 필요 -> Mem0
- 제품/사용자 간 관계 추론 필요 -> Zep (지식 그래프)
- 과거 해결 사례 참조 필요 -> 에피소딕 메모리
- 비용 효율적 운영 필요 -> 듀얼 레이어
결정: Mem0 (개인화) + Zep (관계/에피소딕) 조합
아키텍처: 듀얼 레이어 (Hot Path + Cold Path)# Python 환경 설정
python -m venv venv
source venv/bin/activate
# 핵심 패키지
pip install mem0ai zep-cloud openai
# 유틸리티
pip install tiktoken pydantic redisfrom pydantic import BaseModel
class MemoryConfig(BaseModel):
# Mem0 설정
mem0_api_key: str
mem0_org_id: str
# Zep 설정
zep_api_key: str
# LLM 설정
llm_provider: str = "anthropic"
llm_model: str = "claude-sonnet-4-20250514"
# 토큰 예산
total_token_budget: int = 128_000
hot_path_ratio: float = 0.50
cold_path_ratio: float = 0.25
output_reserve_ratio: float = 0.25
# 캐시
redis_url: str = "redis://localhost:6379"
cache_ttl_seconds: int = 3600
# 압축
consolidation_interval_hours: int = 24
min_memories_for_consolidation: int = 10from dataclasses import dataclass, field
from mem0 import Memory
@dataclass
class SessionContext:
user_id: str
session_id: str
system_prompt: str
user_profile: dict = field(default_factory=dict)
conversation_summary: str = ""
recent_messages: list = field(default_factory=list)
active_goals: list = field(default_factory=list)
class HotPathManager:
def __init__(self, config: MemoryConfig):
self.config = config
self.mem0 = Memory(api_key=config.mem0_api_key)
self.token_budget = int(
config.total_token_budget * config.hot_path_ratio
)
async def build_context(
self, user_id: str, session_id: str
) -> SessionContext:
# 1. 시스템 프롬프트 (고정)
system_prompt = self._load_system_prompt()
# 2. Mem0에서 사용자 핵심 프로필 로드
user_memories = self.mem0.get_all(user_id=user_id)
user_profile = self._extract_core_profile(user_memories)
# 3. 세션 캐시에서 대화 요약과 최근 메시지 로드
session_data = await self._load_session_cache(session_id)
return SessionContext(
user_id=user_id,
session_id=session_id,
system_prompt=system_prompt,
user_profile=user_profile,
conversation_summary=session_data.get("summary", ""),
recent_messages=session_data.get("recent", []),
active_goals=session_data.get("goals", []),
)
def _extract_core_profile(self, memories: list) -> dict:
"""Mem0 메모리에서 핵심 프로필 정보만 추출"""
profile = {
"preferences": [],
"expertise": [],
"history_summary": "",
}
for mem in memories[:10]: # 최대 10개 메모리만 프로필에 포함
content = mem.get("memory", "")
category = mem.get("metadata", {}).get("category", "general")
if category == "preference":
profile["preferences"].append(content)
elif category == "expertise":
profile["expertise"].append(content)
return profile
def _load_system_prompt(self) -> str:
return """당신은 고객 지원 에이전트입니다.
사용자의 과거 이력과 선호도를 참고하여 맞춤형 지원을 제공합니다.
기술적인 문제는 단계별로 안내하고, 과거에 유사한 문제를 해결한 경험이 있다면 참조합니다."""import asyncio
from zep_cloud.client import AsyncZep
class ColdPathManager:
def __init__(self, config: MemoryConfig):
self.config = config
self.mem0 = Memory(api_key=config.mem0_api_key)
self.zep = AsyncZep(api_key=config.zep_api_key)
self.token_budget = int(
config.total_token_budget * config.cold_path_ratio
)
async def search(
self, query: str, user_id: str, sources: list[str]
) -> list[dict]:
tasks = []
if "vector" in sources:
tasks.append(self._search_vector(query, user_id))
if "graph" in sources:
tasks.append(self._search_graph(query, user_id))
if "episodic" in sources:
tasks.append(self._search_episodic(query, user_id))
# 병렬 검색 (타임아웃 3초)
results = await asyncio.gather(
*tasks, return_exceptions=True
)
# 실패한 소스 제외 후 결과 통합
valid_results = []
for r in results:
if isinstance(r, Exception):
continue # 로깅 후 건너뛰기
valid_results.extend(r)
return self._rank_and_deduplicate(valid_results)
async def _search_vector(
self, query: str, user_id: str
) -> list[dict]:
"""Mem0 벡터 검색 -- 사실과 선호도"""
results = self.mem0.search(query, user_id=user_id, limit=5)
return [
{
"content": r["memory"],
"source": "vector",
"score": r.get("score", 0.0),
}
for r in results
]
async def _search_graph(
self, query: str, user_id: str
) -> list[dict]:
"""Zep 지식 그래프 검색 -- 엔티티 관계"""
results = await self.zep.graph.search(
user_id=user_id,
query=query,
limit=5,
)
return [
{
"content": r.fact,
"source": "graph",
"score": r.score,
}
for r in results.edges
]
async def _search_episodic(
self, query: str, user_id: str
) -> list[dict]:
"""Zep 에피소딕 검색 -- 과거 상호작용"""
results = await self.zep.memory.search(
session_id=f"user_{user_id}",
text=query,
limit=3,
)
return [
{
"content": r.message.get("content", ""),
"source": "episodic",
"score": r.score,
}
for r in results
]
def _rank_and_deduplicate(
self, results: list[dict]
) -> list[dict]:
"""가중 점수로 통합 순위 결정, 중복 제거"""
source_weights = {
"vector": 0.5,
"graph": 0.3,
"episodic": 0.2,
}
for r in results:
r["final_score"] = (
r["score"] * source_weights.get(r["source"], 0.3)
)
# 점수 기준 정렬 후 상위 결과 반환
results.sort(key=lambda r: r["final_score"], reverse=True)
# 토큰 예산 내에서 결과 자르기
selected = []
used_tokens = 0
for r in results:
tokens = count_tokens(r["content"])
if used_tokens + tokens > self.token_budget:
break
selected.append(r)
used_tokens += tokens
return selectedCold Path 검색에서 하나의 소스가 실패하더라도 전체 응답이 차단되지 않아야 합니다. asyncio.gather에서 예외를 반환값으로 처리하고, 사용 가능한 결과만으로 응답을 생성하는 Graceful Degradation이 필수입니다.
import re
class MemoryRouter:
# 경량 규칙 기반 라우터
RECALL_PATTERNS = [
r"이전에|지난번|전에|기억|했었|과거",
]
RELATION_PATTERNS = [
r"누구|어느\s*팀|관련된|소속|담당|연결",
]
GREETING_PATTERNS = [
r"^(안녕|감사|좋습니다|네|알겠)",
]
def route(
self, message: str, conversation_state: dict
) -> dict:
# 인사/확인 -> 메모리 불필요
if self._matches(message, self.GREETING_PATTERNS):
return {"sources": [], "query": message}
sources = []
# 회고 질문 -> 에피소딕 + 벡터
if self._matches(message, self.RECALL_PATTERNS):
sources.extend(["episodic", "vector"])
# 관계 질문 -> 그래프
if self._matches(message, self.RELATION_PATTERNS):
sources.append("graph")
# 새 주제 시작 -> 벡터 (배경 지식)
if conversation_state.get("topic_changed", False):
if "vector" not in sources:
sources.append("vector")
# 기본: 벡터 검색
if not sources:
sources = ["vector"]
return {"sources": sources, "query": message}
def _matches(self, text: str, patterns: list[str]) -> bool:
return any(re.search(p, text) for p in patterns)6장에서 다룬 계층적 통합을 실제로 구현합니다.
from datetime import datetime, timedelta
class CompressionPipeline:
def __init__(self, config: MemoryConfig, llm_client):
self.config = config
self.llm = llm_client
self.mem0 = Memory(api_key=config.mem0_api_key)
async def run(self, user_id: str) -> dict:
stats = {
"memories_processed": 0,
"memories_consolidated": 0,
"tokens_saved": 0,
}
# 모든 메모리 조회
all_memories = self.mem0.get_all(user_id=user_id)
if len(all_memories) < self.config.min_memories_for_consolidation:
return stats
# 주제별 그룹화
topic_groups = self._group_by_topic(all_memories)
for topic, memories in topic_groups.items():
if len(memories) < 3:
continue
# 그룹 내 메모리 통합
consolidated = await self._consolidate_group(
topic, memories
)
if consolidated:
# 기존 메모리 삭제
for mem in memories:
self.mem0.delete(mem["id"])
# 통합된 메모리 저장
self.mem0.add(
consolidated["content"],
user_id=user_id,
metadata={
"category": "consolidated",
"original_count": len(memories),
"topic": topic,
},
)
original_tokens = sum(
count_tokens(m["memory"]) for m in memories
)
new_tokens = count_tokens(consolidated["content"])
stats["memories_processed"] += len(memories)
stats["memories_consolidated"] += 1
stats["tokens_saved"] += original_tokens - new_tokens
return stats
async def _consolidate_group(
self, topic: str, memories: list
) -> dict | None:
memory_texts = [m["memory"] for m in memories]
response = await self.llm.complete(
messages=[{
"role": "system",
"content": f"""다음 메모리들을 하나의 통합된 요약으로 결합하세요.
주제: {topic}
규칙:
1. 모든 핵심 사실을 보존합니다
2. 모순되는 정보가 있으면 최신 정보를 우선합니다
3. 중복을 제거합니다
4. 결과는 원본 전체 길이의 30-50%로 압축합니다""",
}, {
"role": "user",
"content": "\n---\n".join(memory_texts),
}],
)
return {"content": response.content}
def _group_by_topic(self, memories: list) -> dict:
groups = {}
for mem in memories:
topic = mem.get("metadata", {}).get("category", "general")
if topic not in groups:
groups[topic] = []
groups[topic].append(mem)
return groups메모리 시스템의 성능을 측정하기 위한 벤치마크 프레임워크입니다.
import time
from dataclasses import dataclass
@dataclass
class BenchmarkResult:
test_name: str
latency_ms: float
token_count: int
memory_hit_rate: float
relevance_score: float
class MemoryBenchmark:
def __init__(self, memory_system):
self.system = memory_system
self.results: list[BenchmarkResult] = []
async def run_all(self, test_cases: list[dict]) -> list[BenchmarkResult]:
for case in test_cases:
result = await self._run_single(case)
self.results.append(result)
return self.results
async def _run_single(self, case: dict) -> BenchmarkResult:
start = time.monotonic()
# 메모리 검색 실행
context = await self.system.process_message(
message=case["query"],
user_id=case["user_id"],
)
latency = (time.monotonic() - start) * 1000 # ms
# 관련성 평가
relevance = await self._evaluate_relevance(
query=case["query"],
retrieved=context.cold_path_results,
expected=case["expected_memories"],
)
return BenchmarkResult(
test_name=case["name"],
latency_ms=latency,
token_count=context.total_tokens,
memory_hit_rate=relevance["hit_rate"],
relevance_score=relevance["score"],
)
def print_report(self) -> None:
print("\n=== 메모리 시스템 벤치마크 결과 ===\n")
for r in self.results:
print(f" {r.test_name}")
print(f" 지연시간: {r.latency_ms:.1f}ms")
print(f" 토큰 수: {r.token_count:,}")
print(f" 적중률: {r.memory_hit_rate:.1%}")
print(f" 관련성: {r.relevance_score:.2f}")
print()성능 목표:
- Cold Path 검색 지연시간: 500ms 이하 (p95)
- 메모리 적중률: 80% 이상
- 관련성 점수: 0.75 이상 (0-1 스케일)
- Hot Path 토큰 사용: 전체 예산의 50% 이내
비용 목표:
- 메모리 검색 비용: 응답당 $0.01 이하
- 압축 파이프라인: 월 $10 이하 (사용자 1000명 기준)
- 전체 메모리 인프라: 총 API 비용의 15% 이내메모리 시스템을 프로덕션에 배포하기 전 확인해야 할 항목들입니다.
프로덕션 배포는 점진적으로 진행하는 것을 권장합니다. 먼저 Hot Path만 활성화하고, Cold Path를 단계적으로 추가하며, 각 단계에서 성능과 비용을 검증합니다.
| 단계 | 기간 | 핵심 작업 | 성공 기준 |
|---|---|---|---|
| 1단계 | 2주 | 세션 관리, 대화 요약, 사용자 프로필 | 대화 맥락 유지율 90% |
| 2단계 | 2주 | Mem0 연동, 벡터 검색 | 메모리 적중률 70% |
| 3단계 | 3주 | Zep 연동, 지식 그래프 | 관계 질문 정확도 75% |
| 4단계 | 2주 | 압축 파이프라인, 생명주기 관리 | 30% 토큰 절감 |
| 5단계 | 2주 | 캐싱, 벤치마킹, 모니터링 | p95 지연시간 500ms 이하 |
10장에 걸쳐 AI 에이전트 메모리 시스템의 전 영역을 다루었습니다.
| 장 | 핵심 주제 | 실무 적용 |
|---|---|---|
| 1장 | 메모리의 필요성, 3가지 유형 | 프로젝트 요구사항 분석 |
| 2장 | 단기 메모리, 컨텍스트 관리 | 슬라이딩 윈도우, 요약 구현 |
| 3장 | 장기 메모리, 벡터 검색 | 벡터 DB 선택과 인덱싱 |
| 4장 | 에피소딕 메모리 | 경험 기반 의사결정 |
| 5장 | 지식 그래프, Zep | 엔티티/관계 구조화 |
| 6장 | 메모리 압축 | 3-40배 압축 파이프라인 |
| 7장 | 프레임워크 비교 | 의사결정 트리 |
| 8장 | 듀얼 레이어 아키텍처 | Hot/Cold Path 설계 |
| 9장 | 프로젝트 메모리 | CLAUDE.md, 팀 메모리 |
| 10장 | 실전 프로젝트 | 통합 시스템 구축 |
AI 에이전트에서 메모리는 단순한 부가 기능이 아닙니다. 에이전트가 사용자를 이해하고, 과거에서 배우고, 일관된 경험을 제공하기 위한 핵심 인프라입니다. 이 시리즈가 프로덕션 수준의 메모리 시스템을 설계하고 구축하는 데 실질적인 도움이 되기를 바랍니다.
이 글이 도움이 되셨나요?
CLAUDE.md 기반 프로젝트 메모리, 코드베이스 컨텍스트 지속, 세션 간 학습, 팀 메모리 설계 패턴 등 코딩 에이전트에 특화된 메모리 시스템을 다룹니다.
Hot Path와 Cold Path를 결합한 듀얼 레이어 메모리 아키텍처의 설계, 하이브리드 검색, 메모리 라우팅, 비용-지연시간 최적화 전략을 다룹니다.
Mem0, Zep, Letta, LangChain/LangGraph의 메모리 시스템을 상세 비교하고, 프로젝트 요구사항에 맞는 프레임워크 선택 의사결정 트리를 제시합니다.