본문으로 건너뛰기
Kreath Archive
TechProjectsBooksAbout
TechProjectsBooksAbout
© 2026 Kreath
홈TechProjectsBooksAbout
//
  1. 홈
  2. 테크
  3. 8장: 에이전트 플릿 관리와 스케일링
2026년 4월 5일·AI / ML·

8장: 에이전트 플릿 관리와 스케일링

수십에서 수백 개의 에이전트를 운영하는 플릿 관리 전략, 자동 스케일링, 로드 밸런싱, 비용 최적화를 다룹니다.

13분1,238자5개 섹션
ai-에이전트플릿-관리스케일링비용-최적화
공유
agent-orchestration8 / 11
1234567891011
이전7장: 상태 관리와 장애 복구다음9장: 관측 가능성과 디버깅

플릿 관리의 필요성

2026년 초, 선도적인 조직들은 50~200개 이상의 AI 에이전트를 동시에 운영하고 있습니다. 5개의 에이전트를 관리하는 방법은 50개에서 붕괴합니다. 에이전트 수가 늘어나면 개별 에이전트의 상태 파악, 버전 관리, 비용 추적, 장애 대응이 기하급수적으로 복잡해집니다.

에이전트 플릿(Agent Fleet)은 조직이 운영하는 전체 에이전트 집합을 의미하며, 플릿 관리(Fleet Management)는 이 집합을 체계적으로 배포, 모니터링, 최적화하는 활동입니다.

플릿 관리의 다섯 가지 핵심 축은 다음과 같습니다.

  1. 중앙 가시성(Centralized Visibility): 모든 에이전트의 상태를 한 곳에서 파악
  2. 구조화된 작업 관리(Structured Task Management): 에이전트 간 작업 배분과 우선순위
  3. 품질 보증(Quality Assurance): 에이전트 출력의 일관된 품질 유지
  4. 통신과 조율(Communication & Coordination): 에이전트 간 효율적 메시지 교환
  5. 지속적 개선(Continuous Improvement): 메트릭 기반 성능 최적화

플릿 아키텍처

계층형 플릿 구조

대규모 플릿은 단일 계층으로 관리하기 어렵습니다. 기능 영역별로 클러스터를 나누고, 클러스터 간의 조율은 상위 계층에서 담당합니다.

python
from dataclasses import dataclass, field
 
@dataclass
class AgentCluster:
    """기능 영역별 에이전트 클러스터"""
    name: str
    domain: str  # "customer-service", "data-pipeline", "devops"
    agents: list[str]  # 에이전트 ID 목록
    max_capacity: int
    current_load: float = 0.0
 
@dataclass
class FleetTopology:
    """전체 플릿 토폴로지"""
    clusters: dict[str, AgentCluster] = field(default_factory=dict)
 
    def add_cluster(self, cluster: AgentCluster):
        self.clusters[cluster.name] = cluster
 
    def get_cluster_for_task(self, domain: str) -> AgentCluster | None:
        """도메인에 맞는 클러스터 조회"""
        for cluster in self.clusters.values():
            if cluster.domain == domain:
                return cluster
        return None
 
    def get_available_capacity(self) -> dict[str, float]:
        """클러스터별 가용 용량"""
        return {
            name: cluster.max_capacity - cluster.current_load
            for name, cluster in self.clusters.items()
        }
 
# 플릿 구성 예시
fleet = FleetTopology()
fleet.add_cluster(AgentCluster(
    name="cs-team",
    domain="customer-service",
    agents=["triage-v2", "billing-v3", "shipping-v1", "technical-v2"],
    max_capacity=100  # 동시 처리 가능 대화 수
))
fleet.add_cluster(AgentCluster(
    name="data-team",
    domain="data-pipeline",
    agents=["etl-agent", "analyst-v2", "reporter-v1"],
    max_capacity=50
))
fleet.add_cluster(AgentCluster(
    name="ops-team",
    domain="devops",
    agents=["deploy-agent", "monitor-agent", "incident-responder"],
    max_capacity=30
))

로드 밸런싱

동일한 능력을 가진 에이전트가 여러 인스턴스로 실행될 때, 작업을 균등하게 분배해야 합니다.

python
from abc import ABC, abstractmethod
import random
 
class LoadBalancer(ABC):
    @abstractmethod
    async def select(
        self, candidates: list[str], task: dict
    ) -> str:
        """작업을 처리할 에이전트 선택"""
        pass
 
class RoundRobinBalancer(LoadBalancer):
    def __init__(self):
        self._index = 0
 
    async def select(
        self, candidates: list[str], task: dict
    ) -> str:
        agent = candidates[self._index % len(candidates)]
        self._index += 1
        return agent
 
class LeastLoadBalancer(LoadBalancer):
    def __init__(self, metrics_provider):
        self.metrics = metrics_provider
 
    async def select(
        self, candidates: list[str], task: dict
    ) -> str:
        loads = {}
        for agent_id in candidates:
            loads[agent_id] = await self.metrics.get_current_load(
                agent_id
            )
        return min(loads, key=loads.get)
 
class WeightedBalancer(LoadBalancer):
    """성능 메트릭 기반 가중 로드 밸런싱"""
 
    def __init__(self, metrics_provider):
        self.metrics = metrics_provider
 
    async def select(
        self, candidates: list[str], task: dict
    ) -> str:
        weights = []
        for agent_id in candidates:
            stats = await self.metrics.get_agent_stats(agent_id)
            # 성공률이 높고 지연시간이 낮은 에이전트에 높은 가중치
            weight = (
                stats.success_rate * 0.6
                + (1 / max(stats.avg_latency_ms, 1)) * 1000 * 0.3
                + (1 - stats.current_load / stats.max_capacity) * 0.1
            )
            weights.append(weight)
 
        # 가중 랜덤 선택
        total = sum(weights)
        r = random.uniform(0, total)
        cumulative = 0
        for i, w in enumerate(weights):
            cumulative += w
            if r <= cumulative:
                return candidates[i]
        return candidates[-1]

자동 스케일링

에이전트 인스턴스 스케일링

트래픽 변동에 따라 에이전트 인스턴스 수를 동적으로 조정합니다.

python
from dataclasses import dataclass
 
@dataclass
class ScalingPolicy:
    min_instances: int = 1
    max_instances: int = 10
    target_utilization: float = 0.7  # 70%
    scale_up_threshold: float = 0.85
    scale_down_threshold: float = 0.3
    cooldown_seconds: int = 300  # 스케일링 간 최소 대기 시간
 
class AutoScaler:
    def __init__(
        self,
        policy: ScalingPolicy,
        metrics_provider,
        agent_spawner
    ):
        self.policy = policy
        self.metrics = metrics_provider
        self.spawner = agent_spawner
        self.last_scale_time = 0
 
    async def evaluate(self, agent_type: str) -> dict:
        """스케일링 필요 여부 평가"""
        current_instances = await self.spawner.count_instances(
            agent_type
        )
        avg_utilization = await self.metrics.get_avg_utilization(
            agent_type
        )
        queue_depth = await self.metrics.get_queue_depth(
            agent_type
        )
 
        action = "none"
        target_instances = current_instances
 
        if avg_utilization > self.policy.scale_up_threshold:
            # 스케일 업
            target_instances = min(
                current_instances + self._scale_up_amount(
                    avg_utilization, queue_depth
                ),
                self.policy.max_instances
            )
            action = "scale_up"
        elif avg_utilization < self.policy.scale_down_threshold:
            # 스케일 다운
            target_instances = max(
                current_instances - 1,
                self.policy.min_instances
            )
            action = "scale_down"
 
        return {
            "action": action,
            "current": current_instances,
            "target": target_instances,
            "utilization": avg_utilization,
            "queue_depth": queue_depth
        }
 
    def _scale_up_amount(
        self, utilization: float, queue_depth: int
    ) -> int:
        """스케일업 인스턴스 수 계산"""
        if queue_depth > 100:
            return 3  # 급격한 증가
        elif queue_depth > 50:
            return 2
        return 1
 
    async def apply(self, agent_type: str):
        """스케일링 적용"""
        now = time.time()
        if now - self.last_scale_time < self.policy.cooldown_seconds:
            return  # 쿨다운 기간
 
        decision = await self.evaluate(agent_type)
 
        if decision["action"] == "scale_up":
            diff = decision["target"] - decision["current"]
            for _ in range(diff):
                await self.spawner.spawn(agent_type)
            self.last_scale_time = now
        elif decision["action"] == "scale_down":
            diff = decision["current"] - decision["target"]
            # 유휴 인스턴스부터 종료
            idle = await self.spawner.get_idle_instances(
                agent_type
            )
            for instance in idle[:diff]:
                await self.spawner.drain_and_stop(instance)
            self.last_scale_time = now

모델 티어링(Model Tiering)

모든 작업에 고성능 모델을 사용할 필요는 없습니다. 작업의 복잡도에 따라 모델을 동적으로 선택하면 비용을 크게 절감할 수 있습니다.

python
class ModelTieringStrategy:
    """작업 복잡도에 따른 모델 자동 선택"""
 
    TIERS = {
        "simple": {
            "model": "claude-haiku-4-5",
            "cost_per_1k": 0.001,
            "max_complexity": 3
        },
        "standard": {
            "model": "claude-sonnet-4-6",
            "cost_per_1k": 0.003,
            "max_complexity": 7
        },
        "complex": {
            "model": "claude-opus-4-6",
            "cost_per_1k": 0.015,
            "max_complexity": 10
        }
    }
 
    async def select_model(
        self, task: str, context: dict
    ) -> str:
        complexity = await self._estimate_complexity(
            task, context
        )
 
        for tier_name, tier in self.TIERS.items():
            if complexity <= tier["max_complexity"]:
                return tier["model"]
 
        return self.TIERS["complex"]["model"]
 
    async def _estimate_complexity(
        self, task: str, context: dict
    ) -> int:
        """작업 복잡도 추정 (1-10)"""
        indicators = {
            "multi_step": "단계" in task or "순서" in task,
            "reasoning": "분석" in task or "비교" in task,
            "code_generation": "코드" in task or "구현" in task,
            "long_context": len(str(context)) > 5000,
            "tool_heavy": context.get("required_tools", 0) > 3,
        }
        score = sum(2 for v in indicators.values() if v)
        return min(score + 1, 10)  # 최소 1, 최대 10
Tip

모델 티어링만으로도 LLM 비용을 40~60% 절감할 수 있습니다. 핵심은 복잡도 추정의 정확도입니다. 초기에는 보수적으로(상위 모델 선호) 시작하고, 메트릭을 수집하면서 점진적으로 조정하세요.

비용 관리

비용 추적 시스템

python
@dataclass
class CostRecord:
    workflow_id: str
    agent_id: str
    model: str
    input_tokens: int
    output_tokens: int
    tool_calls: int
    timestamp: datetime
    cost_usd: float
 
class CostTracker:
    def __init__(self, storage):
        self.storage = storage
        self.pricing = {
            "claude-opus-4-6": {"input": 0.015, "output": 0.075},
            "claude-sonnet-4-6": {"input": 0.003, "output": 0.015},
            "claude-haiku-4-5": {"input": 0.001, "output": 0.005},
        }
 
    async def record(
        self,
        workflow_id: str,
        agent_id: str,
        model: str,
        input_tokens: int,
        output_tokens: int,
        tool_calls: int = 0
    ):
        price = self.pricing.get(model, {"input": 0, "output": 0})
        cost = (
            (input_tokens / 1000) * price["input"]
            + (output_tokens / 1000) * price["output"]
        )
 
        record = CostRecord(
            workflow_id=workflow_id,
            agent_id=agent_id,
            model=model,
            input_tokens=input_tokens,
            output_tokens=output_tokens,
            tool_calls=tool_calls,
            timestamp=datetime.now(),
            cost_usd=cost
        )
        await self.storage.save(record)
 
    async def get_daily_summary(
        self, date: str | None = None
    ) -> dict:
        """일별 비용 요약"""
        records = await self.storage.query(date=date or today())
        by_agent = {}
        for r in records:
            if r.agent_id not in by_agent:
                by_agent[r.agent_id] = {
                    "total_cost": 0, "calls": 0,
                    "total_tokens": 0
                }
            by_agent[r.agent_id]["total_cost"] += r.cost_usd
            by_agent[r.agent_id]["calls"] += 1
            by_agent[r.agent_id]["total_tokens"] += (
                r.input_tokens + r.output_tokens
            )
 
        return {
            "date": date or today(),
            "total_cost": sum(
                a["total_cost"] for a in by_agent.values()
            ),
            "by_agent": by_agent,
            "total_calls": sum(
                a["calls"] for a in by_agent.values()
            )
        }
 
    async def check_budget(
        self, agent_id: str, budget_usd: float
    ) -> bool:
        """예산 초과 여부 확인"""
        summary = await self.get_daily_summary()
        agent_cost = summary["by_agent"].get(
            agent_id, {}
        ).get("total_cost", 0)
        return agent_cost < budget_usd

비용 최적화 전략

python
class CostOptimizer:
    """에이전트 플릿의 비용 최적화 제안 생성"""
 
    async def analyze(self, cost_tracker: CostTracker) -> list[dict]:
        suggestions = []
        summary = await cost_tracker.get_daily_summary()
 
        for agent_id, stats in summary["by_agent"].items():
            avg_cost_per_call = stats["total_cost"] / max(
                stats["calls"], 1
            )
 
            # 고비용 에이전트 식별
            if avg_cost_per_call > 0.10:
                suggestions.append({
                    "agent": agent_id,
                    "type": "model_downgrade",
                    "message": (
                        f"호출당 평균 비용 ${avg_cost_per_call:.3f}."
                        " 하위 모델로 전환을 검토하세요."
                    ),
                    "estimated_saving": avg_cost_per_call * 0.5
                })
 
            # 낮은 활용도 에이전트 식별
            if stats["calls"] < 5:
                suggestions.append({
                    "agent": agent_id,
                    "type": "decommission_candidate",
                    "message": (
                        f"일일 호출 {stats['calls']}회."
                        " 다른 에이전트에 통합을 검토하세요."
                    )
                })
 
        return suggestions

플릿 대시보드

운영팀이 플릿 상태를 한눈에 파악할 수 있는 대시보드의 핵심 지표입니다.

python
class FleetDashboard:
    def __init__(
        self,
        registry,
        cost_tracker,
        metrics_provider
    ):
        self.registry = registry
        self.costs = cost_tracker
        self.metrics = metrics_provider
 
    async def get_overview(self) -> dict:
        """플릿 전체 현황"""
        agents = await self.registry.discover()
 
        status_counts = {}
        for agent in agents:
            status_counts[agent.status] = (
                status_counts.get(agent.status, 0) + 1
            )
 
        daily_cost = await self.costs.get_daily_summary()
 
        return {
            "total_agents": len(agents),
            "status_distribution": status_counts,
            "daily_cost_usd": daily_cost["total_cost"],
            "total_calls_today": daily_cost["total_calls"],
            "top_cost_agents": sorted(
                daily_cost["by_agent"].items(),
                key=lambda x: x[1]["total_cost"],
                reverse=True
            )[:5],
            "alerts": await self._get_active_alerts()
        }
 
    async def _get_active_alerts(self) -> list[dict]:
        """활성 알림 목록"""
        alerts = []
 
        # 성능 저하 에이전트
        degraded = await self.registry.discover(
            status=AgentStatus.DEGRADED
        )
        for agent in degraded:
            alerts.append({
                "severity": "warning",
                "agent": agent.name,
                "message": f"{agent.name} 성능 저하 감지"
            })
 
        # 예산 초과 경고
        daily = await self.costs.get_daily_summary()
        if daily["total_cost"] > 50.0:  # $50 일일 한도
            alerts.append({
                "severity": "critical",
                "agent": "fleet",
                "message": (
                    f"일일 비용 ${daily['total_cost']:.2f}"
                    " — 예산 초과"
                )
            })
 
        return alerts
Info

플릿 대시보드의 핵심 지표는 다섯 가지입니다. (1) 에이전트 상태 분포, (2) 일일/월별 비용, (3) 에이전트별 성공률, (4) 평균 응답 시간, (5) 큐 깊이(대기 중인 작업 수). 이 지표를 실시간으로 모니터링하면 대부분의 운영 문제를 조기에 발견할 수 있습니다.


다음 장에서는 멀티에이전트 시스템의 관측 가능성(Observability)과 디버깅을 다룹니다. 분산 추적, 구조화된 로깅, 메트릭 수집, 그리고 에이전트 행동의 시각화를 살펴봅니다.

이 글이 도움이 되셨나요?

관련 글

AI / ML

9장: 관측 가능성과 디버깅

멀티에이전트 시스템의 분산 추적, 구조화된 로깅, 메트릭 수집, 에이전트 행동 시각화, 그리고 프로덕션 디버깅 전략을 다룹니다.

2026년 4월 6일·13분
AI / ML

10장: 보안과 거버넌스

멀티에이전트 시스템의 인증과 인가, 최소 권한 원칙, 데이터 보호, 감사 추적, 규제 준수까지 프로덕션 보안과 거버넌스를 다룹니다.

2026년 4월 8일·16분
AI / ML

7장: 상태 관리와 장애 복구

멀티에이전트 시스템의 체크포인팅, 이벤트 소싱, 재시도 전략, 그리고 분산 환경에서의 일관성 보장과 장애 복구 패턴을 다룹니다.

2026년 4월 2일·14분
이전 글7장: 상태 관리와 장애 복구
다음 글9장: 관측 가능성과 디버깅

댓글

목차

약 13분 남음
  • 플릿 관리의 필요성
  • 플릿 아키텍처
    • 계층형 플릿 구조
    • 로드 밸런싱
  • 자동 스케일링
    • 에이전트 인스턴스 스케일링
    • 모델 티어링(Model Tiering)
  • 비용 관리
    • 비용 추적 시스템
    • 비용 최적화 전략
  • 플릿 대시보드