수십에서 수백 개의 에이전트를 운영하는 플릿 관리 전략, 자동 스케일링, 로드 밸런싱, 비용 최적화를 다룹니다.
2026년 초, 선도적인 조직들은 50~200개 이상의 AI 에이전트를 동시에 운영하고 있습니다. 5개의 에이전트를 관리하는 방법은 50개에서 붕괴합니다. 에이전트 수가 늘어나면 개별 에이전트의 상태 파악, 버전 관리, 비용 추적, 장애 대응이 기하급수적으로 복잡해집니다.
에이전트 플릿(Agent Fleet)은 조직이 운영하는 전체 에이전트 집합을 의미하며, 플릿 관리(Fleet Management)는 이 집합을 체계적으로 배포, 모니터링, 최적화하는 활동입니다.
플릿 관리의 다섯 가지 핵심 축은 다음과 같습니다.
대규모 플릿은 단일 계층으로 관리하기 어렵습니다. 기능 영역별로 클러스터를 나누고, 클러스터 간의 조율은 상위 계층에서 담당합니다.
from dataclasses import dataclass, field
@dataclass
class AgentCluster:
"""기능 영역별 에이전트 클러스터"""
name: str
domain: str # "customer-service", "data-pipeline", "devops"
agents: list[str] # 에이전트 ID 목록
max_capacity: int
current_load: float = 0.0
@dataclass
class FleetTopology:
"""전체 플릿 토폴로지"""
clusters: dict[str, AgentCluster] = field(default_factory=dict)
def add_cluster(self, cluster: AgentCluster):
self.clusters[cluster.name] = cluster
def get_cluster_for_task(self, domain: str) -> AgentCluster | None:
"""도메인에 맞는 클러스터 조회"""
for cluster in self.clusters.values():
if cluster.domain == domain:
return cluster
return None
def get_available_capacity(self) -> dict[str, float]:
"""클러스터별 가용 용량"""
return {
name: cluster.max_capacity - cluster.current_load
for name, cluster in self.clusters.items()
}
# 플릿 구성 예시
fleet = FleetTopology()
fleet.add_cluster(AgentCluster(
name="cs-team",
domain="customer-service",
agents=["triage-v2", "billing-v3", "shipping-v1", "technical-v2"],
max_capacity=100 # 동시 처리 가능 대화 수
))
fleet.add_cluster(AgentCluster(
name="data-team",
domain="data-pipeline",
agents=["etl-agent", "analyst-v2", "reporter-v1"],
max_capacity=50
))
fleet.add_cluster(AgentCluster(
name="ops-team",
domain="devops",
agents=["deploy-agent", "monitor-agent", "incident-responder"],
max_capacity=30
))동일한 능력을 가진 에이전트가 여러 인스턴스로 실행될 때, 작업을 균등하게 분배해야 합니다.
from abc import ABC, abstractmethod
import random
class LoadBalancer(ABC):
@abstractmethod
async def select(
self, candidates: list[str], task: dict
) -> str:
"""작업을 처리할 에이전트 선택"""
pass
class RoundRobinBalancer(LoadBalancer):
def __init__(self):
self._index = 0
async def select(
self, candidates: list[str], task: dict
) -> str:
agent = candidates[self._index % len(candidates)]
self._index += 1
return agent
class LeastLoadBalancer(LoadBalancer):
def __init__(self, metrics_provider):
self.metrics = metrics_provider
async def select(
self, candidates: list[str], task: dict
) -> str:
loads = {}
for agent_id in candidates:
loads[agent_id] = await self.metrics.get_current_load(
agent_id
)
return min(loads, key=loads.get)
class WeightedBalancer(LoadBalancer):
"""성능 메트릭 기반 가중 로드 밸런싱"""
def __init__(self, metrics_provider):
self.metrics = metrics_provider
async def select(
self, candidates: list[str], task: dict
) -> str:
weights = []
for agent_id in candidates:
stats = await self.metrics.get_agent_stats(agent_id)
# 성공률이 높고 지연시간이 낮은 에이전트에 높은 가중치
weight = (
stats.success_rate * 0.6
+ (1 / max(stats.avg_latency_ms, 1)) * 1000 * 0.3
+ (1 - stats.current_load / stats.max_capacity) * 0.1
)
weights.append(weight)
# 가중 랜덤 선택
total = sum(weights)
r = random.uniform(0, total)
cumulative = 0
for i, w in enumerate(weights):
cumulative += w
if r <= cumulative:
return candidates[i]
return candidates[-1]트래픽 변동에 따라 에이전트 인스턴스 수를 동적으로 조정합니다.
from dataclasses import dataclass
@dataclass
class ScalingPolicy:
min_instances: int = 1
max_instances: int = 10
target_utilization: float = 0.7 # 70%
scale_up_threshold: float = 0.85
scale_down_threshold: float = 0.3
cooldown_seconds: int = 300 # 스케일링 간 최소 대기 시간
class AutoScaler:
def __init__(
self,
policy: ScalingPolicy,
metrics_provider,
agent_spawner
):
self.policy = policy
self.metrics = metrics_provider
self.spawner = agent_spawner
self.last_scale_time = 0
async def evaluate(self, agent_type: str) -> dict:
"""스케일링 필요 여부 평가"""
current_instances = await self.spawner.count_instances(
agent_type
)
avg_utilization = await self.metrics.get_avg_utilization(
agent_type
)
queue_depth = await self.metrics.get_queue_depth(
agent_type
)
action = "none"
target_instances = current_instances
if avg_utilization > self.policy.scale_up_threshold:
# 스케일 업
target_instances = min(
current_instances + self._scale_up_amount(
avg_utilization, queue_depth
),
self.policy.max_instances
)
action = "scale_up"
elif avg_utilization < self.policy.scale_down_threshold:
# 스케일 다운
target_instances = max(
current_instances - 1,
self.policy.min_instances
)
action = "scale_down"
return {
"action": action,
"current": current_instances,
"target": target_instances,
"utilization": avg_utilization,
"queue_depth": queue_depth
}
def _scale_up_amount(
self, utilization: float, queue_depth: int
) -> int:
"""스케일업 인스턴스 수 계산"""
if queue_depth > 100:
return 3 # 급격한 증가
elif queue_depth > 50:
return 2
return 1
async def apply(self, agent_type: str):
"""스케일링 적용"""
now = time.time()
if now - self.last_scale_time < self.policy.cooldown_seconds:
return # 쿨다운 기간
decision = await self.evaluate(agent_type)
if decision["action"] == "scale_up":
diff = decision["target"] - decision["current"]
for _ in range(diff):
await self.spawner.spawn(agent_type)
self.last_scale_time = now
elif decision["action"] == "scale_down":
diff = decision["current"] - decision["target"]
# 유휴 인스턴스부터 종료
idle = await self.spawner.get_idle_instances(
agent_type
)
for instance in idle[:diff]:
await self.spawner.drain_and_stop(instance)
self.last_scale_time = now모든 작업에 고성능 모델을 사용할 필요는 없습니다. 작업의 복잡도에 따라 모델을 동적으로 선택하면 비용을 크게 절감할 수 있습니다.
class ModelTieringStrategy:
"""작업 복잡도에 따른 모델 자동 선택"""
TIERS = {
"simple": {
"model": "claude-haiku-4-5",
"cost_per_1k": 0.001,
"max_complexity": 3
},
"standard": {
"model": "claude-sonnet-4-6",
"cost_per_1k": 0.003,
"max_complexity": 7
},
"complex": {
"model": "claude-opus-4-6",
"cost_per_1k": 0.015,
"max_complexity": 10
}
}
async def select_model(
self, task: str, context: dict
) -> str:
complexity = await self._estimate_complexity(
task, context
)
for tier_name, tier in self.TIERS.items():
if complexity <= tier["max_complexity"]:
return tier["model"]
return self.TIERS["complex"]["model"]
async def _estimate_complexity(
self, task: str, context: dict
) -> int:
"""작업 복잡도 추정 (1-10)"""
indicators = {
"multi_step": "단계" in task or "순서" in task,
"reasoning": "분석" in task or "비교" in task,
"code_generation": "코드" in task or "구현" in task,
"long_context": len(str(context)) > 5000,
"tool_heavy": context.get("required_tools", 0) > 3,
}
score = sum(2 for v in indicators.values() if v)
return min(score + 1, 10) # 최소 1, 최대 10모델 티어링만으로도 LLM 비용을 40~60% 절감할 수 있습니다. 핵심은 복잡도 추정의 정확도입니다. 초기에는 보수적으로(상위 모델 선호) 시작하고, 메트릭을 수집하면서 점진적으로 조정하세요.
@dataclass
class CostRecord:
workflow_id: str
agent_id: str
model: str
input_tokens: int
output_tokens: int
tool_calls: int
timestamp: datetime
cost_usd: float
class CostTracker:
def __init__(self, storage):
self.storage = storage
self.pricing = {
"claude-opus-4-6": {"input": 0.015, "output": 0.075},
"claude-sonnet-4-6": {"input": 0.003, "output": 0.015},
"claude-haiku-4-5": {"input": 0.001, "output": 0.005},
}
async def record(
self,
workflow_id: str,
agent_id: str,
model: str,
input_tokens: int,
output_tokens: int,
tool_calls: int = 0
):
price = self.pricing.get(model, {"input": 0, "output": 0})
cost = (
(input_tokens / 1000) * price["input"]
+ (output_tokens / 1000) * price["output"]
)
record = CostRecord(
workflow_id=workflow_id,
agent_id=agent_id,
model=model,
input_tokens=input_tokens,
output_tokens=output_tokens,
tool_calls=tool_calls,
timestamp=datetime.now(),
cost_usd=cost
)
await self.storage.save(record)
async def get_daily_summary(
self, date: str | None = None
) -> dict:
"""일별 비용 요약"""
records = await self.storage.query(date=date or today())
by_agent = {}
for r in records:
if r.agent_id not in by_agent:
by_agent[r.agent_id] = {
"total_cost": 0, "calls": 0,
"total_tokens": 0
}
by_agent[r.agent_id]["total_cost"] += r.cost_usd
by_agent[r.agent_id]["calls"] += 1
by_agent[r.agent_id]["total_tokens"] += (
r.input_tokens + r.output_tokens
)
return {
"date": date or today(),
"total_cost": sum(
a["total_cost"] for a in by_agent.values()
),
"by_agent": by_agent,
"total_calls": sum(
a["calls"] for a in by_agent.values()
)
}
async def check_budget(
self, agent_id: str, budget_usd: float
) -> bool:
"""예산 초과 여부 확인"""
summary = await self.get_daily_summary()
agent_cost = summary["by_agent"].get(
agent_id, {}
).get("total_cost", 0)
return agent_cost < budget_usdclass CostOptimizer:
"""에이전트 플릿의 비용 최적화 제안 생성"""
async def analyze(self, cost_tracker: CostTracker) -> list[dict]:
suggestions = []
summary = await cost_tracker.get_daily_summary()
for agent_id, stats in summary["by_agent"].items():
avg_cost_per_call = stats["total_cost"] / max(
stats["calls"], 1
)
# 고비용 에이전트 식별
if avg_cost_per_call > 0.10:
suggestions.append({
"agent": agent_id,
"type": "model_downgrade",
"message": (
f"호출당 평균 비용 ${avg_cost_per_call:.3f}."
" 하위 모델로 전환을 검토하세요."
),
"estimated_saving": avg_cost_per_call * 0.5
})
# 낮은 활용도 에이전트 식별
if stats["calls"] < 5:
suggestions.append({
"agent": agent_id,
"type": "decommission_candidate",
"message": (
f"일일 호출 {stats['calls']}회."
" 다른 에이전트에 통합을 검토하세요."
)
})
return suggestions운영팀이 플릿 상태를 한눈에 파악할 수 있는 대시보드의 핵심 지표입니다.
class FleetDashboard:
def __init__(
self,
registry,
cost_tracker,
metrics_provider
):
self.registry = registry
self.costs = cost_tracker
self.metrics = metrics_provider
async def get_overview(self) -> dict:
"""플릿 전체 현황"""
agents = await self.registry.discover()
status_counts = {}
for agent in agents:
status_counts[agent.status] = (
status_counts.get(agent.status, 0) + 1
)
daily_cost = await self.costs.get_daily_summary()
return {
"total_agents": len(agents),
"status_distribution": status_counts,
"daily_cost_usd": daily_cost["total_cost"],
"total_calls_today": daily_cost["total_calls"],
"top_cost_agents": sorted(
daily_cost["by_agent"].items(),
key=lambda x: x[1]["total_cost"],
reverse=True
)[:5],
"alerts": await self._get_active_alerts()
}
async def _get_active_alerts(self) -> list[dict]:
"""활성 알림 목록"""
alerts = []
# 성능 저하 에이전트
degraded = await self.registry.discover(
status=AgentStatus.DEGRADED
)
for agent in degraded:
alerts.append({
"severity": "warning",
"agent": agent.name,
"message": f"{agent.name} 성능 저하 감지"
})
# 예산 초과 경고
daily = await self.costs.get_daily_summary()
if daily["total_cost"] > 50.0: # $50 일일 한도
alerts.append({
"severity": "critical",
"agent": "fleet",
"message": (
f"일일 비용 ${daily['total_cost']:.2f}"
" — 예산 초과"
)
})
return alerts플릿 대시보드의 핵심 지표는 다섯 가지입니다. (1) 에이전트 상태 분포, (2) 일일/월별 비용, (3) 에이전트별 성공률, (4) 평균 응답 시간, (5) 큐 깊이(대기 중인 작업 수). 이 지표를 실시간으로 모니터링하면 대부분의 운영 문제를 조기에 발견할 수 있습니다.
다음 장에서는 멀티에이전트 시스템의 관측 가능성(Observability)과 디버깅을 다룹니다. 분산 추적, 구조화된 로깅, 메트릭 수집, 그리고 에이전트 행동의 시각화를 살펴봅니다.
이 글이 도움이 되셨나요?