본문으로 건너뛰기
Kreath Archive
TechProjectsBooksAbout
TechProjectsBooksAbout
© 2026 Kreath
홈TechProjectsBooksAbout
//
  1. 홈
  2. 테크
  3. 6장: 에이전트 컨트롤 플레인 설계
2026년 3월 31일·AI / ML·

6장: 에이전트 컨트롤 플레인 설계

에이전트 등록과 발견, 설정 관리, 생명주기 관리, 정책 엔진까지 프로덕션 멀티에이전트 시스템의 중추인 컨트롤 플레인 설계를 다룹니다.

15분1,405자6개 섹션
ai-에이전트컨트롤-플레인에이전트-관리프로덕션
공유
agent-orchestration6 / 11
1234567891011
이전5장: 에이전트 협업과 합의 메커니즘다음7장: 상태 관리와 장애 복구

컨트롤 플레인이란

Kubernetes의 컨트롤 플레인이 컨테이너의 생명주기를 관리하듯, 에이전트 컨트롤 플레인(Agent Control Plane)은 에이전트의 등록, 발견, 설정, 모니터링, 거버넌스를 중앙에서 관리하는 계층입니다.

에이전트가 5개 이하인 프로토타입 단계에서는 컨트롤 플레인 없이도 운영이 가능합니다. 하지만 에이전트 수가 20개를 넘어가면, 어떤 에이전트가 어디서 실행되고, 어떤 권한을 가지며, 현재 상태가 어떤지를 추적하는 것이 수동으로는 불가능해집니다.

컨트롤 플레인의 핵심 구성 요소는 다음과 같습니다.

┌───────────────────────────────────────────────┐
│              에이전트 컨트롤 플레인             │
│                                               │
│  ┌──────────┐ ┌──────────┐ ┌──────────┐     │
│  │에이전트   │ │  설정    │ │  정책    │     │
│  │레지스트리 │ │  관리    │ │  엔진    │     │
│  └──────────┘ └──────────┘ └──────────┘     │
│  ┌──────────┐ ┌──────────┐ ┌──────────┐     │
│  │생명주기  │ │  헬스    │ │  감사    │     │
│  │관리      │ │  체크    │ │  로그    │     │
│  └──────────┘ └──────────┘ └──────────┘     │
├───────────────────────────────────────────────┤
│         데이터 플레인 (에이전트 실행)           │
│  [Agent A] [Agent B] [Agent C] [Agent D]     │
└───────────────────────────────────────────────┘

에이전트 레지스트리

레지스트리의 역할

에이전트 레지스트리는 시스템에 등록된 모든 에이전트의 메타데이터를 관리합니다. 마이크로서비스의 서비스 레지스트리(Consul, Eureka)와 유사한 역할을 합니다.

python
from datetime import datetime
from pydantic import BaseModel
from enum import Enum
 
class AgentStatus(str, Enum):
    REGISTERED = "registered"
    ACTIVE = "active"
    DEGRADED = "degraded"
    INACTIVE = "inactive"
    DECOMMISSIONED = "decommissioned"
 
class AgentRegistration(BaseModel):
    """에이전트 등록 정보"""
    agent_id: str
    name: str
    version: str
    description: str
    owner: str  # 담당 팀/개인
    capabilities: list[str]
    tools: list[str]
    model: str  # 사용 중인 LLM 모델
    endpoint: str | None  # A2A 엔드포인트 (원격인 경우)
    max_concurrency: int
    timeout_seconds: int
    tags: dict[str, str]
    status: AgentStatus = AgentStatus.REGISTERED
    registered_at: datetime = datetime.now()
    last_heartbeat: datetime | None = None
 
class AgentRegistry:
    def __init__(self, storage):
        self.storage = storage
 
    async def register(
        self, registration: AgentRegistration
    ) -> str:
        """에이전트 등록"""
        # 중복 검사
        existing = await self.storage.find_by_name(
            registration.name, registration.version
        )
        if existing:
            raise AgentAlreadyRegisteredError(
                f"{registration.name}@{registration.version}"
            )
 
        await self.storage.save(registration)
        await self.emit_event("agent.registered", registration)
        return registration.agent_id
 
    async def discover(
        self,
        capabilities: list[str] | None = None,
        tags: dict[str, str] | None = None,
        status: AgentStatus = AgentStatus.ACTIVE
    ) -> list[AgentRegistration]:
        """조건에 맞는 에이전트 검색"""
        agents = await self.storage.find_all(status=status)
 
        if capabilities:
            agents = [
                a for a in agents
                if set(capabilities) <= set(a.capabilities)
            ]
        if tags:
            agents = [
                a for a in agents
                if all(
                    a.tags.get(k) == v for k, v in tags.items()
                )
            ]
        return agents
 
    async def heartbeat(self, agent_id: str):
        """에이전트 헬스 비트 수신"""
        agent = await self.storage.find_by_id(agent_id)
        if agent:
            agent.last_heartbeat = datetime.now()
            agent.status = AgentStatus.ACTIVE
            await self.storage.save(agent)
 
    async def check_health(self):
        """비활성 에이전트 감지"""
        threshold = datetime.now() - timedelta(minutes=5)
        agents = await self.storage.find_all(
            status=AgentStatus.ACTIVE
        )
        for agent in agents:
            if (
                agent.last_heartbeat
                and agent.last_heartbeat < threshold
            ):
                agent.status = AgentStatus.DEGRADED
                await self.storage.save(agent)
                await self.emit_event("agent.degraded", agent)

에이전트 발견(Discovery)

다른 에이전트나 오케스트레이터가 필요한 에이전트를 찾는 메커니즘입니다.

python
class AgentDiscoveryService:
    def __init__(self, registry: AgentRegistry):
        self.registry = registry
        self._cache: dict[str, list[AgentRegistration]] = {}
        self._cache_ttl = 60  # 초
 
    async def find_best_agent(
        self,
        task_description: str,
        required_capabilities: list[str],
        prefer_low_latency: bool = False
    ) -> AgentRegistration | None:
        """작업에 가장 적합한 에이전트 선택"""
        candidates = await self.registry.discover(
            capabilities=required_capabilities,
            status=AgentStatus.ACTIVE
        )
 
        if not candidates:
            return None
 
        # 점수 기반 선택
        scored = []
        for agent in candidates:
            score = self._calculate_fitness(
                agent, required_capabilities, prefer_low_latency
            )
            scored.append((agent, score))
 
        scored.sort(key=lambda x: x[1], reverse=True)
        return scored[0][0]
 
    def _calculate_fitness(
        self,
        agent: AgentRegistration,
        required: list[str],
        prefer_low_latency: bool
    ) -> float:
        # 능력 일치도 (0-1)
        capability_match = len(
            set(required) & set(agent.capabilities)
        ) / len(required)
 
        # 동시성 여유분 (현재 부하 대비)
        concurrency_score = 0.5  # 실제로는 메트릭에서 조회
 
        # 지연시간 가중치
        latency_weight = 0.3 if prefer_low_latency else 0.1
 
        return (
            capability_match * 0.5
            + concurrency_score * (0.5 - latency_weight)
            + (1.0 / agent.timeout_seconds) * latency_weight
        )

설정 관리(Configuration Management)

중앙 집중식 설정

에이전트의 동작을 제어하는 설정을 중앙에서 관리하면, 개별 에이전트를 재배포하지 않고도 동작을 조정할 수 있습니다.

python
from dataclasses import dataclass, field
 
@dataclass
class AgentConfig:
    """에이전트 런타임 설정"""
    agent_id: str
 
    # LLM 설정
    model: str = "claude-sonnet-4-6"
    temperature: float = 0.7
    max_tokens: int = 4096
 
    # 실행 제어
    max_retries: int = 3
    timeout_seconds: int = 30
    rate_limit_rpm: int = 60
 
    # 기능 플래그
    features: dict[str, bool] = field(default_factory=dict)
 
    # 가드레일
    blocked_tools: list[str] = field(default_factory=list)
    allowed_domains: list[str] = field(default_factory=list)
 
class ConfigurationManager:
    def __init__(self, store):
        self.store = store
        self._watchers: dict[str, list[callable]] = {}
 
    async def get_config(self, agent_id: str) -> AgentConfig:
        """에이전트 설정 조회 (기본값 + 오버라이드)"""
        defaults = await self.store.get("defaults")
        agent_specific = await self.store.get(agent_id)
        # 에이전트별 설정이 기본값을 오버라이드
        return AgentConfig(
            agent_id=agent_id,
            **{**defaults, **(agent_specific or {})}
        )
 
    async def update_config(
        self, agent_id: str, updates: dict
    ):
        """설정 업데이트 및 변경 알림"""
        current = await self.get_config(agent_id)
        for key, value in updates.items():
            setattr(current, key, value)
        await self.store.save(agent_id, current)
 
        # 변경 감지 콜백 호출
        for callback in self._watchers.get(agent_id, []):
            await callback(current)
 
    def watch(self, agent_id: str, callback: callable):
        """설정 변경 감시"""
        self._watchers.setdefault(agent_id, []).append(callback)

동적 프롬프트 관리

시스템 프롬프트를 설정으로 관리하면 에이전트 코드를 수정하지 않고도 행동을 조정할 수 있습니다.

python
class PromptManager:
    def __init__(self, store):
        self.store = store
 
    async def get_prompt(
        self, agent_id: str, version: str = "latest"
    ) -> str:
        """버전 관리되는 프롬프트 조회"""
        prompt = await self.store.get(
            f"prompts/{agent_id}/{version}"
        )
        return prompt
 
    async def update_prompt(
        self, agent_id: str, new_prompt: str,
        author: str, reason: str
    ):
        """프롬프트 업데이트 (이력 보존)"""
        current_version = await self.store.get_latest_version(
            agent_id
        )
        new_version = current_version + 1
 
        await self.store.save(
            f"prompts/{agent_id}/{new_version}",
            {
                "content": new_prompt,
                "version": new_version,
                "author": author,
                "reason": reason,
                "timestamp": datetime.now().isoformat(),
                "previous_version": current_version
            }
        )
 
    async def rollback_prompt(
        self, agent_id: str, target_version: int
    ):
        """이전 버전으로 롤백"""
        old_prompt = await self.get_prompt(
            agent_id, str(target_version)
        )
        await self.update_prompt(
            agent_id, old_prompt,
            author="system",
            reason=f"Rollback to v{target_version}"
        )
Tip

프롬프트 변경은 에이전트 동작에 큰 영향을 미칩니다. 변경 전후의 평가 점수를 비교하는 자동화된 파이프라인을 구축하세요. A/B 테스트로 일부 트래픽에만 새 프롬프트를 적용하는 것도 안전한 방법입니다.

정책 엔진(Policy Engine)

에이전트의 행동을 제어하는 규칙을 중앙에서 관리합니다.

정책 정의

python
from pydantic import BaseModel
from enum import Enum
 
class PolicyAction(str, Enum):
    ALLOW = "allow"
    DENY = "deny"
    REQUIRE_APPROVAL = "require_approval"
    LOG_ONLY = "log_only"
 
class Policy(BaseModel):
    id: str
    name: str
    description: str
    conditions: dict  # 조건
    action: PolicyAction
    priority: int  # 높을수록 우선
    enabled: bool = True
 
class PolicyEngine:
    def __init__(self):
        self.policies: list[Policy] = []
 
    def add_policy(self, policy: Policy):
        self.policies.append(policy)
        self.policies.sort(key=lambda p: p.priority, reverse=True)
 
    async def evaluate(
        self,
        agent_id: str,
        action: str,
        context: dict
    ) -> PolicyAction:
        """정책 평가: 에이전트 행동의 허용 여부 결정"""
        for policy in self.policies:
            if not policy.enabled:
                continue
            if self._matches(policy.conditions, {
                "agent_id": agent_id,
                "action": action,
                **context
            }):
                return policy.action
        return PolicyAction.ALLOW  # 기본 허용
 
    def _matches(self, conditions: dict, context: dict) -> bool:
        for key, expected in conditions.items():
            actual = context.get(key)
            if isinstance(expected, list):
                if actual not in expected:
                    return False
            elif actual != expected:
                return False
        return True
 
# 정책 예시
engine = PolicyEngine()
 
# 프로덕션 DB 접근 제한
engine.add_policy(Policy(
    id="prod-db-restrict",
    name="프로덕션 DB 쓰기 제한",
    description="프로덕션 데이터베이스에 대한 쓰기 작업은 승인 필요",
    conditions={
        "action": "database_write",
        "environment": "production"
    },
    action=PolicyAction.REQUIRE_APPROVAL,
    priority=100
))
 
# 비용 제한
engine.add_policy(Policy(
    id="cost-limit",
    name="단일 작업 비용 상한",
    description="에이전트 단일 작업의 예상 비용이 $1를 초과하면 차단",
    conditions={
        "estimated_cost_exceeds": 1.0
    },
    action=PolicyAction.DENY,
    priority=90
))
 
# 특정 에이전트의 외부 통신 차단
engine.add_policy(Policy(
    id="no-external-comms",
    name="내부 에이전트 외부 통신 차단",
    description="internal 태그 에이전트의 외부 API 호출 차단",
    conditions={
        "agent_tags": {"scope": "internal"},
        "action": "external_api_call"
    },
    action=PolicyAction.DENY,
    priority=80
))

생명주기 관리

에이전트는 등록부터 폐기까지 명확한 생명주기를 거칩니다.

등록(Registered) → 활성화(Active) → 비활성(Inactive) → 폐기(Decommissioned)
                       ↕
                   성능저하(Degraded)

생명주기 관리자

python
class AgentLifecycleManager:
    def __init__(
        self, registry: AgentRegistry,
        config_manager: ConfigurationManager,
        policy_engine: PolicyEngine
    ):
        self.registry = registry
        self.config = config_manager
        self.policy = policy_engine
 
    async def activate(self, agent_id: str):
        """에이전트 활성화"""
        agent = await self.registry.get(agent_id)
 
        # 활성화 전 검증
        config = await self.config.get_config(agent_id)
        if not self._validate_config(config):
            raise InvalidConfigError(
                f"Agent {agent_id} config validation failed"
            )
 
        # 정책 확인
        action = await self.policy.evaluate(
            agent_id, "activate", {"config": config.dict()}
        )
        if action == PolicyAction.DENY:
            raise PolicyDeniedError(
                f"Agent {agent_id} activation denied by policy"
            )
 
        agent.status = AgentStatus.ACTIVE
        await self.registry.save(agent)
        await self._emit("agent.activated", agent)
 
    async def deactivate(self, agent_id: str, reason: str):
        """에이전트 비활성화"""
        agent = await self.registry.get(agent_id)
        agent.status = AgentStatus.INACTIVE
        await self.registry.save(agent)
 
        # 진행 중인 작업 처리
        pending_tasks = await self._get_pending_tasks(agent_id)
        for task in pending_tasks:
            # 대체 에이전트로 재할당
            alternative = await self.registry.discover(
                capabilities=agent.capabilities,
                status=AgentStatus.ACTIVE
            )
            if alternative:
                await self._reassign_task(task, alternative[0])
            else:
                await self._fail_task(
                    task, f"Agent {agent_id} deactivated: {reason}"
                )
 
        await self._emit("agent.deactivated", {
            "agent": agent, "reason": reason
        })
 
    async def rolling_update(
        self, agent_id: str,
        new_version: AgentRegistration
    ):
        """무중단 에이전트 업데이트"""
        old_agent = await self.registry.get(agent_id)
 
        # 1. 새 버전 등록 (카나리)
        new_agent_id = f"{agent_id}-v{new_version.version}"
        new_version.agent_id = new_agent_id
        await self.registry.register(new_version)
        await self.activate(new_agent_id)
 
        # 2. 트래픽 점진적 전환 (10% → 50% → 100%)
        for weight in [10, 50, 100]:
            await self._update_routing_weight(
                old_agent.agent_id, 100 - weight,
                new_agent_id, weight
            )
            await asyncio.sleep(60)  # 1분 관찰
 
            # 에러율 확인
            error_rate = await self._check_error_rate(new_agent_id)
            if error_rate > 0.05:  # 5% 초과 시 롤백
                await self._update_routing_weight(
                    old_agent.agent_id, 100, new_agent_id, 0
                )
                await self.deactivate(
                    new_agent_id, "High error rate during rollout"
                )
                return {"status": "rolled_back", "error_rate": error_rate}
 
        # 3. 이전 버전 폐기
        await self.deactivate(
            old_agent.agent_id, "Replaced by new version"
        )
        return {"status": "completed", "new_version": new_agent_id}
Warning

에이전트 업데이트 시 "진행 중인 작업"의 처리가 가장 까다롭습니다. 긴 실행 시간의 작업이 있다면 드레인(Drain) 기간을 두어 기존 작업이 완료된 후 에이전트를 교체하세요.

컨트롤 플레인 API 설계

외부에서 컨트롤 플레인을 조작하기 위한 API입니다.

python
from fastapi import FastAPI, HTTPException
 
app = FastAPI(title="Agent Control Plane")
 
@app.get("/agents")
async def list_agents(
    status: AgentStatus | None = None,
    capability: str | None = None
):
    """등록된 에이전트 목록 조회"""
    return await registry.discover(
        capabilities=[capability] if capability else None,
        status=status
    )
 
@app.post("/agents")
async def register_agent(registration: AgentRegistration):
    """에이전트 등록"""
    agent_id = await registry.register(registration)
    return {"agent_id": agent_id, "status": "registered"}
 
@app.post("/agents/{agent_id}/activate")
async def activate_agent(agent_id: str):
    """에이전트 활성화"""
    await lifecycle.activate(agent_id)
    return {"status": "active"}
 
@app.put("/agents/{agent_id}/config")
async def update_config(agent_id: str, updates: dict):
    """에이전트 설정 업데이트"""
    await config_manager.update_config(agent_id, updates)
    return {"status": "updated"}
 
@app.post("/agents/{agent_id}/rolling-update")
async def rolling_update(
    agent_id: str, new_version: AgentRegistration
):
    """무중단 업데이트"""
    result = await lifecycle.rolling_update(agent_id, new_version)
    return result
 
@app.get("/policies")
async def list_policies():
    """활성 정책 목록"""
    return [p.dict() for p in policy_engine.policies if p.enabled]

다음 장에서는 멀티에이전트 시스템의 상태 관리와 장애 복구를 다룹니다. 체크포인팅, 이벤트 소싱, 재시도 전략, 그리고 분산 환경에서의 일관성 보장을 살펴봅니다.

이 글이 도움이 되셨나요?

관련 글

AI / ML

5장: 에이전트 협업과 합의 메커니즘

투표, 토론, 비판적 검토, 다수결 합의 등 여러 에이전트가 하나의 결론에 도달하기 위한 협업 패턴과 합의 프로토콜을 다룹니다.

2026년 3월 30일·16분
AI / ML

7장: 상태 관리와 장애 복구

멀티에이전트 시스템의 체크포인팅, 이벤트 소싱, 재시도 전략, 그리고 분산 환경에서의 일관성 보장과 장애 복구 패턴을 다룹니다.

2026년 4월 2일·14분
AI / ML

4장: 핸드오프와 작업 위임 패턴

에이전트 간 작업 위임의 핵심 메커니즘인 핸드오프 패턴, 라우팅 전략, 동적 위임의 구현과 최적화를 실전 코드와 함께 다룹니다.

2026년 3월 28일·17분
이전 글5장: 에이전트 협업과 합의 메커니즘
다음 글7장: 상태 관리와 장애 복구

댓글

목차

약 15분 남음
  • 컨트롤 플레인이란
  • 에이전트 레지스트리
    • 레지스트리의 역할
    • 에이전트 발견(Discovery)
  • 설정 관리(Configuration Management)
    • 중앙 집중식 설정
    • 동적 프롬프트 관리
  • 정책 엔진(Policy Engine)
    • 정책 정의
  • 생명주기 관리
    • 생명주기 관리자
  • 컨트롤 플레인 API 설계