에이전트 등록과 발견, 설정 관리, 생명주기 관리, 정책 엔진까지 프로덕션 멀티에이전트 시스템의 중추인 컨트롤 플레인 설계를 다룹니다.
Kubernetes의 컨트롤 플레인이 컨테이너의 생명주기를 관리하듯, 에이전트 컨트롤 플레인(Agent Control Plane)은 에이전트의 등록, 발견, 설정, 모니터링, 거버넌스를 중앙에서 관리하는 계층입니다.
에이전트가 5개 이하인 프로토타입 단계에서는 컨트롤 플레인 없이도 운영이 가능합니다. 하지만 에이전트 수가 20개를 넘어가면, 어떤 에이전트가 어디서 실행되고, 어떤 권한을 가지며, 현재 상태가 어떤지를 추적하는 것이 수동으로는 불가능해집니다.
컨트롤 플레인의 핵심 구성 요소는 다음과 같습니다.
┌───────────────────────────────────────────────┐
│ 에이전트 컨트롤 플레인 │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │에이전트 │ │ 설정 │ │ 정책 │ │
│ │레지스트리 │ │ 관리 │ │ 엔진 │ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │생명주기 │ │ 헬스 │ │ 감사 │ │
│ │관리 │ │ 체크 │ │ 로그 │ │
│ └──────────┘ └──────────┘ └──────────┘ │
├───────────────────────────────────────────────┤
│ 데이터 플레인 (에이전트 실행) │
│ [Agent A] [Agent B] [Agent C] [Agent D] │
└───────────────────────────────────────────────┘
에이전트 레지스트리는 시스템에 등록된 모든 에이전트의 메타데이터를 관리합니다. 마이크로서비스의 서비스 레지스트리(Consul, Eureka)와 유사한 역할을 합니다.
from datetime import datetime
from pydantic import BaseModel
from enum import Enum
class AgentStatus(str, Enum):
REGISTERED = "registered"
ACTIVE = "active"
DEGRADED = "degraded"
INACTIVE = "inactive"
DECOMMISSIONED = "decommissioned"
class AgentRegistration(BaseModel):
"""에이전트 등록 정보"""
agent_id: str
name: str
version: str
description: str
owner: str # 담당 팀/개인
capabilities: list[str]
tools: list[str]
model: str # 사용 중인 LLM 모델
endpoint: str | None # A2A 엔드포인트 (원격인 경우)
max_concurrency: int
timeout_seconds: int
tags: dict[str, str]
status: AgentStatus = AgentStatus.REGISTERED
registered_at: datetime = datetime.now()
last_heartbeat: datetime | None = None
class AgentRegistry:
def __init__(self, storage):
self.storage = storage
async def register(
self, registration: AgentRegistration
) -> str:
"""에이전트 등록"""
# 중복 검사
existing = await self.storage.find_by_name(
registration.name, registration.version
)
if existing:
raise AgentAlreadyRegisteredError(
f"{registration.name}@{registration.version}"
)
await self.storage.save(registration)
await self.emit_event("agent.registered", registration)
return registration.agent_id
async def discover(
self,
capabilities: list[str] | None = None,
tags: dict[str, str] | None = None,
status: AgentStatus = AgentStatus.ACTIVE
) -> list[AgentRegistration]:
"""조건에 맞는 에이전트 검색"""
agents = await self.storage.find_all(status=status)
if capabilities:
agents = [
a for a in agents
if set(capabilities) <= set(a.capabilities)
]
if tags:
agents = [
a for a in agents
if all(
a.tags.get(k) == v for k, v in tags.items()
)
]
return agents
async def heartbeat(self, agent_id: str):
"""에이전트 헬스 비트 수신"""
agent = await self.storage.find_by_id(agent_id)
if agent:
agent.last_heartbeat = datetime.now()
agent.status = AgentStatus.ACTIVE
await self.storage.save(agent)
async def check_health(self):
"""비활성 에이전트 감지"""
threshold = datetime.now() - timedelta(minutes=5)
agents = await self.storage.find_all(
status=AgentStatus.ACTIVE
)
for agent in agents:
if (
agent.last_heartbeat
and agent.last_heartbeat < threshold
):
agent.status = AgentStatus.DEGRADED
await self.storage.save(agent)
await self.emit_event("agent.degraded", agent)다른 에이전트나 오케스트레이터가 필요한 에이전트를 찾는 메커니즘입니다.
class AgentDiscoveryService:
def __init__(self, registry: AgentRegistry):
self.registry = registry
self._cache: dict[str, list[AgentRegistration]] = {}
self._cache_ttl = 60 # 초
async def find_best_agent(
self,
task_description: str,
required_capabilities: list[str],
prefer_low_latency: bool = False
) -> AgentRegistration | None:
"""작업에 가장 적합한 에이전트 선택"""
candidates = await self.registry.discover(
capabilities=required_capabilities,
status=AgentStatus.ACTIVE
)
if not candidates:
return None
# 점수 기반 선택
scored = []
for agent in candidates:
score = self._calculate_fitness(
agent, required_capabilities, prefer_low_latency
)
scored.append((agent, score))
scored.sort(key=lambda x: x[1], reverse=True)
return scored[0][0]
def _calculate_fitness(
self,
agent: AgentRegistration,
required: list[str],
prefer_low_latency: bool
) -> float:
# 능력 일치도 (0-1)
capability_match = len(
set(required) & set(agent.capabilities)
) / len(required)
# 동시성 여유분 (현재 부하 대비)
concurrency_score = 0.5 # 실제로는 메트릭에서 조회
# 지연시간 가중치
latency_weight = 0.3 if prefer_low_latency else 0.1
return (
capability_match * 0.5
+ concurrency_score * (0.5 - latency_weight)
+ (1.0 / agent.timeout_seconds) * latency_weight
)에이전트의 동작을 제어하는 설정을 중앙에서 관리하면, 개별 에이전트를 재배포하지 않고도 동작을 조정할 수 있습니다.
from dataclasses import dataclass, field
@dataclass
class AgentConfig:
"""에이전트 런타임 설정"""
agent_id: str
# LLM 설정
model: str = "claude-sonnet-4-6"
temperature: float = 0.7
max_tokens: int = 4096
# 실행 제어
max_retries: int = 3
timeout_seconds: int = 30
rate_limit_rpm: int = 60
# 기능 플래그
features: dict[str, bool] = field(default_factory=dict)
# 가드레일
blocked_tools: list[str] = field(default_factory=list)
allowed_domains: list[str] = field(default_factory=list)
class ConfigurationManager:
def __init__(self, store):
self.store = store
self._watchers: dict[str, list[callable]] = {}
async def get_config(self, agent_id: str) -> AgentConfig:
"""에이전트 설정 조회 (기본값 + 오버라이드)"""
defaults = await self.store.get("defaults")
agent_specific = await self.store.get(agent_id)
# 에이전트별 설정이 기본값을 오버라이드
return AgentConfig(
agent_id=agent_id,
**{**defaults, **(agent_specific or {})}
)
async def update_config(
self, agent_id: str, updates: dict
):
"""설정 업데이트 및 변경 알림"""
current = await self.get_config(agent_id)
for key, value in updates.items():
setattr(current, key, value)
await self.store.save(agent_id, current)
# 변경 감지 콜백 호출
for callback in self._watchers.get(agent_id, []):
await callback(current)
def watch(self, agent_id: str, callback: callable):
"""설정 변경 감시"""
self._watchers.setdefault(agent_id, []).append(callback)시스템 프롬프트를 설정으로 관리하면 에이전트 코드를 수정하지 않고도 행동을 조정할 수 있습니다.
class PromptManager:
def __init__(self, store):
self.store = store
async def get_prompt(
self, agent_id: str, version: str = "latest"
) -> str:
"""버전 관리되는 프롬프트 조회"""
prompt = await self.store.get(
f"prompts/{agent_id}/{version}"
)
return prompt
async def update_prompt(
self, agent_id: str, new_prompt: str,
author: str, reason: str
):
"""프롬프트 업데이트 (이력 보존)"""
current_version = await self.store.get_latest_version(
agent_id
)
new_version = current_version + 1
await self.store.save(
f"prompts/{agent_id}/{new_version}",
{
"content": new_prompt,
"version": new_version,
"author": author,
"reason": reason,
"timestamp": datetime.now().isoformat(),
"previous_version": current_version
}
)
async def rollback_prompt(
self, agent_id: str, target_version: int
):
"""이전 버전으로 롤백"""
old_prompt = await self.get_prompt(
agent_id, str(target_version)
)
await self.update_prompt(
agent_id, old_prompt,
author="system",
reason=f"Rollback to v{target_version}"
)프롬프트 변경은 에이전트 동작에 큰 영향을 미칩니다. 변경 전후의 평가 점수를 비교하는 자동화된 파이프라인을 구축하세요. A/B 테스트로 일부 트래픽에만 새 프롬프트를 적용하는 것도 안전한 방법입니다.
에이전트의 행동을 제어하는 규칙을 중앙에서 관리합니다.
from pydantic import BaseModel
from enum import Enum
class PolicyAction(str, Enum):
ALLOW = "allow"
DENY = "deny"
REQUIRE_APPROVAL = "require_approval"
LOG_ONLY = "log_only"
class Policy(BaseModel):
id: str
name: str
description: str
conditions: dict # 조건
action: PolicyAction
priority: int # 높을수록 우선
enabled: bool = True
class PolicyEngine:
def __init__(self):
self.policies: list[Policy] = []
def add_policy(self, policy: Policy):
self.policies.append(policy)
self.policies.sort(key=lambda p: p.priority, reverse=True)
async def evaluate(
self,
agent_id: str,
action: str,
context: dict
) -> PolicyAction:
"""정책 평가: 에이전트 행동의 허용 여부 결정"""
for policy in self.policies:
if not policy.enabled:
continue
if self._matches(policy.conditions, {
"agent_id": agent_id,
"action": action,
**context
}):
return policy.action
return PolicyAction.ALLOW # 기본 허용
def _matches(self, conditions: dict, context: dict) -> bool:
for key, expected in conditions.items():
actual = context.get(key)
if isinstance(expected, list):
if actual not in expected:
return False
elif actual != expected:
return False
return True
# 정책 예시
engine = PolicyEngine()
# 프로덕션 DB 접근 제한
engine.add_policy(Policy(
id="prod-db-restrict",
name="프로덕션 DB 쓰기 제한",
description="프로덕션 데이터베이스에 대한 쓰기 작업은 승인 필요",
conditions={
"action": "database_write",
"environment": "production"
},
action=PolicyAction.REQUIRE_APPROVAL,
priority=100
))
# 비용 제한
engine.add_policy(Policy(
id="cost-limit",
name="단일 작업 비용 상한",
description="에이전트 단일 작업의 예상 비용이 $1를 초과하면 차단",
conditions={
"estimated_cost_exceeds": 1.0
},
action=PolicyAction.DENY,
priority=90
))
# 특정 에이전트의 외부 통신 차단
engine.add_policy(Policy(
id="no-external-comms",
name="내부 에이전트 외부 통신 차단",
description="internal 태그 에이전트의 외부 API 호출 차단",
conditions={
"agent_tags": {"scope": "internal"},
"action": "external_api_call"
},
action=PolicyAction.DENY,
priority=80
))에이전트는 등록부터 폐기까지 명확한 생명주기를 거칩니다.
등록(Registered) → 활성화(Active) → 비활성(Inactive) → 폐기(Decommissioned)
↕
성능저하(Degraded)
class AgentLifecycleManager:
def __init__(
self, registry: AgentRegistry,
config_manager: ConfigurationManager,
policy_engine: PolicyEngine
):
self.registry = registry
self.config = config_manager
self.policy = policy_engine
async def activate(self, agent_id: str):
"""에이전트 활성화"""
agent = await self.registry.get(agent_id)
# 활성화 전 검증
config = await self.config.get_config(agent_id)
if not self._validate_config(config):
raise InvalidConfigError(
f"Agent {agent_id} config validation failed"
)
# 정책 확인
action = await self.policy.evaluate(
agent_id, "activate", {"config": config.dict()}
)
if action == PolicyAction.DENY:
raise PolicyDeniedError(
f"Agent {agent_id} activation denied by policy"
)
agent.status = AgentStatus.ACTIVE
await self.registry.save(agent)
await self._emit("agent.activated", agent)
async def deactivate(self, agent_id: str, reason: str):
"""에이전트 비활성화"""
agent = await self.registry.get(agent_id)
agent.status = AgentStatus.INACTIVE
await self.registry.save(agent)
# 진행 중인 작업 처리
pending_tasks = await self._get_pending_tasks(agent_id)
for task in pending_tasks:
# 대체 에이전트로 재할당
alternative = await self.registry.discover(
capabilities=agent.capabilities,
status=AgentStatus.ACTIVE
)
if alternative:
await self._reassign_task(task, alternative[0])
else:
await self._fail_task(
task, f"Agent {agent_id} deactivated: {reason}"
)
await self._emit("agent.deactivated", {
"agent": agent, "reason": reason
})
async def rolling_update(
self, agent_id: str,
new_version: AgentRegistration
):
"""무중단 에이전트 업데이트"""
old_agent = await self.registry.get(agent_id)
# 1. 새 버전 등록 (카나리)
new_agent_id = f"{agent_id}-v{new_version.version}"
new_version.agent_id = new_agent_id
await self.registry.register(new_version)
await self.activate(new_agent_id)
# 2. 트래픽 점진적 전환 (10% → 50% → 100%)
for weight in [10, 50, 100]:
await self._update_routing_weight(
old_agent.agent_id, 100 - weight,
new_agent_id, weight
)
await asyncio.sleep(60) # 1분 관찰
# 에러율 확인
error_rate = await self._check_error_rate(new_agent_id)
if error_rate > 0.05: # 5% 초과 시 롤백
await self._update_routing_weight(
old_agent.agent_id, 100, new_agent_id, 0
)
await self.deactivate(
new_agent_id, "High error rate during rollout"
)
return {"status": "rolled_back", "error_rate": error_rate}
# 3. 이전 버전 폐기
await self.deactivate(
old_agent.agent_id, "Replaced by new version"
)
return {"status": "completed", "new_version": new_agent_id}에이전트 업데이트 시 "진행 중인 작업"의 처리가 가장 까다롭습니다. 긴 실행 시간의 작업이 있다면 드레인(Drain) 기간을 두어 기존 작업이 완료된 후 에이전트를 교체하세요.
외부에서 컨트롤 플레인을 조작하기 위한 API입니다.
from fastapi import FastAPI, HTTPException
app = FastAPI(title="Agent Control Plane")
@app.get("/agents")
async def list_agents(
status: AgentStatus | None = None,
capability: str | None = None
):
"""등록된 에이전트 목록 조회"""
return await registry.discover(
capabilities=[capability] if capability else None,
status=status
)
@app.post("/agents")
async def register_agent(registration: AgentRegistration):
"""에이전트 등록"""
agent_id = await registry.register(registration)
return {"agent_id": agent_id, "status": "registered"}
@app.post("/agents/{agent_id}/activate")
async def activate_agent(agent_id: str):
"""에이전트 활성화"""
await lifecycle.activate(agent_id)
return {"status": "active"}
@app.put("/agents/{agent_id}/config")
async def update_config(agent_id: str, updates: dict):
"""에이전트 설정 업데이트"""
await config_manager.update_config(agent_id, updates)
return {"status": "updated"}
@app.post("/agents/{agent_id}/rolling-update")
async def rolling_update(
agent_id: str, new_version: AgentRegistration
):
"""무중단 업데이트"""
result = await lifecycle.rolling_update(agent_id, new_version)
return result
@app.get("/policies")
async def list_policies():
"""활성 정책 목록"""
return [p.dict() for p in policy_engine.policies if p.enabled]다음 장에서는 멀티에이전트 시스템의 상태 관리와 장애 복구를 다룹니다. 체크포인팅, 이벤트 소싱, 재시도 전략, 그리고 분산 환경에서의 일관성 보장을 살펴봅니다.
이 글이 도움이 되셨나요?
투표, 토론, 비판적 검토, 다수결 합의 등 여러 에이전트가 하나의 결론에 도달하기 위한 협업 패턴과 합의 프로토콜을 다룹니다.
멀티에이전트 시스템의 체크포인팅, 이벤트 소싱, 재시도 전략, 그리고 분산 환경에서의 일관성 보장과 장애 복구 패턴을 다룹니다.
에이전트 간 작업 위임의 핵심 메커니즘인 핸드오프 패턴, 라우팅 전략, 동적 위임의 구현과 최적화를 실전 코드와 함께 다룹니다.