멀티에이전트 시스템의 인증과 인가, 최소 권한 원칙, 데이터 보호, 감사 추적, 규제 준수까지 프로덕션 보안과 거버넌스를 다룹니다.
멀티에이전트 시스템의 보안은 전통적인 소프트웨어 보안보다 더 넓은 범위를 다뤄야 합니다. 에이전트는 자율적으로 결정을 내리고, 외부 도구를 호출하며, 다른 에이전트와 자유롭게 통신합니다. 이 자율성이 강력한 기능인 동시에 보안 위협의 원천이 됩니다.
에이전트 시스템에서 고유한 보안 위협은 다음과 같습니다.
프롬프트 인젝션을 통한 권한 상승: 악의적인 입력이 에이전트의 시스템 프롬프트를 우회하여, 허가되지 않은 도구를 호출하거나 데이터에 접근할 수 있습니다.
에이전트 간 신뢰 남용: 에이전트 A가 에이전트 B를 "신뢰"하기 때문에, 침해된 에이전트 A를 통해 에이전트 B의 기능에 접근할 수 있습니다.
도구 호출의 부작용: 에이전트가 외부 API를 호출할 때 의도하지 않은 부작용(데이터 삭제, 결제 처리 등)이 발생할 수 있습니다.
각 에이전트에 고유한 신원(Identity)을 부여하고, 모든 행동을 해당 신원에 귀속시킵니다.
from dataclasses import dataclass
from datetime import datetime, timedelta
import jwt
@dataclass
class AgentIdentity:
"""에이전트의 신원 증명"""
agent_id: str
name: str
owner: str # 담당 팀
created_at: datetime
capabilities: list[str]
trust_level: str # "internal", "external", "sandboxed"
class AgentAuthenticator:
def __init__(self, secret_key: str):
self.secret = secret_key
def issue_token(
self, identity: AgentIdentity, ttl_hours: int = 24
) -> str:
"""에이전트 토큰 발급"""
payload = {
"sub": identity.agent_id,
"name": identity.name,
"owner": identity.owner,
"capabilities": identity.capabilities,
"trust_level": identity.trust_level,
"iat": datetime.now(),
"exp": datetime.now() + timedelta(hours=ttl_hours)
}
return jwt.encode(payload, self.secret, algorithm="HS256")
def verify_token(self, token: str) -> AgentIdentity:
"""에이전트 토큰 검증"""
try:
payload = jwt.decode(
token, self.secret, algorithms=["HS256"]
)
return AgentIdentity(
agent_id=payload["sub"],
name=payload["name"],
owner=payload["owner"],
created_at=datetime.fromisoformat(
payload["iat"]
) if isinstance(payload["iat"], str)
else datetime.fromtimestamp(payload["iat"]),
capabilities=payload["capabilities"],
trust_level=payload["trust_level"]
)
except jwt.ExpiredSignatureError:
raise AuthenticationError("Token expired")
except jwt.InvalidTokenError:
raise AuthenticationError("Invalid token")외부 에이전트와의 통신에서는 Agent Card의 인증 정보를 활용합니다.
class A2AAuthMiddleware:
"""A2A 프로토콜의 인증 미들웨어"""
def __init__(self, authenticator: AgentAuthenticator):
self.auth = authenticator
self.allowed_agents: dict[str, list[str]] = {}
def allow(self, agent_id: str, capabilities: list[str]):
"""특정 에이전트의 접근 허용"""
self.allowed_agents[agent_id] = capabilities
async def authenticate(self, request) -> AgentIdentity:
"""요청의 인증 처리"""
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Bearer "):
raise AuthenticationError("Missing bearer token")
token = auth_header[7:]
identity = self.auth.verify_token(token)
# 허용 목록 확인
if identity.agent_id not in self.allowed_agents:
raise AuthorizationError(
f"Agent {identity.agent_id} not in allowed list"
)
return identity에이전트에게 필요한 최소한의 권한만 부여합니다. 마이크로서비스의 RBAC(역할 기반 접근 제어)와 유사하지만, 도구 호출과 데이터 접근 범위까지 포함합니다.
from dataclasses import dataclass, field
from enum import Enum
class Permission(str, Enum):
TOOL_READ = "tool:read" # 읽기 전용 도구
TOOL_WRITE = "tool:write" # 쓰기 도구
TOOL_DELETE = "tool:delete" # 삭제 도구
AGENT_INVOKE = "agent:invoke" # 다른 에이전트 호출
AGENT_HANDOFF = "agent:handoff" # 핸드오프
DATA_PII = "data:pii" # PII 접근
DATA_FINANCIAL = "data:financial" # 금융 데이터
EXTERNAL_API = "external:api" # 외부 API 호출
@dataclass
class AgentRole:
name: str
permissions: set[Permission]
tool_allowlist: list[str] = field(default_factory=list)
tool_denylist: list[str] = field(default_factory=list)
data_scopes: list[str] = field(default_factory=list)
# 역할 정의 예시
ROLES = {
"reader": AgentRole(
name="reader",
permissions={Permission.TOOL_READ},
tool_allowlist=["query_database", "search_docs"]
),
"operator": AgentRole(
name="operator",
permissions={
Permission.TOOL_READ,
Permission.TOOL_WRITE,
Permission.AGENT_INVOKE
},
tool_allowlist=[
"query_database", "update_record",
"send_notification"
]
),
"admin": AgentRole(
name="admin",
permissions={
Permission.TOOL_READ,
Permission.TOOL_WRITE,
Permission.TOOL_DELETE,
Permission.AGENT_INVOKE,
Permission.AGENT_HANDOFF,
Permission.DATA_PII
}
)
}
class AuthorizationEngine:
def __init__(self):
self.agent_roles: dict[str, AgentRole] = {}
def assign_role(self, agent_id: str, role: AgentRole):
self.agent_roles[agent_id] = role
def check_permission(
self, agent_id: str, permission: Permission
) -> bool:
role = self.agent_roles.get(agent_id)
if not role:
return False
return permission in role.permissions
def check_tool_access(
self, agent_id: str, tool_name: str
) -> bool:
role = self.agent_roles.get(agent_id)
if not role:
return False
# 거부 목록 우선
if tool_name in role.tool_denylist:
return False
# 허용 목록이 비어있으면 모든 도구 허용
if not role.tool_allowlist:
return True
return tool_name in role.tool_allowlist
async def authorize_action(
self,
agent_id: str,
action: str,
resource: str,
context: dict
) -> dict:
"""행동 인가 판단"""
role = self.agent_roles.get(agent_id)
if not role:
return {
"allowed": False,
"reason": "No role assigned"
}
# 도구 호출 인가
if action == "tool_call":
allowed = self.check_tool_access(agent_id, resource)
return {
"allowed": allowed,
"reason": (
"Tool access granted"
if allowed
else f"Tool {resource} not in allowlist"
)
}
# PII 접근 인가
if (
action == "data_access"
and context.get("contains_pii")
):
allowed = Permission.DATA_PII in role.permissions
return {
"allowed": allowed,
"reason": (
"PII access granted"
if allowed
else "PII access not permitted"
)
}
return {"allowed": True, "reason": "Default allow"}에이전트의 도구 호출을 사전에 검증하는 가드레일입니다.
class ToolCallGuard:
"""도구 호출 전 보안 검증"""
def __init__(self, auth_engine: AuthorizationEngine):
self.auth = auth_engine
self.dangerous_patterns = [
r"DROP\s+TABLE",
r"DELETE\s+FROM.*WHERE\s+1\s*=\s*1",
r"rm\s+-rf",
r"sudo\s+",
]
async def validate(
self,
agent_id: str,
tool_name: str,
args: dict
) -> dict:
"""도구 호출 검증"""
# 1. 권한 확인
auth_result = await self.auth.authorize_action(
agent_id, "tool_call", tool_name, args
)
if not auth_result["allowed"]:
return {
"allowed": False,
"reason": auth_result["reason"]
}
# 2. 위험 패턴 검사
args_str = str(args)
for pattern in self.dangerous_patterns:
if re.search(pattern, args_str, re.IGNORECASE):
return {
"allowed": False,
"reason": f"Dangerous pattern detected: {pattern}"
}
# 3. 속도 제한
if not await self._check_rate_limit(agent_id, tool_name):
return {
"allowed": False,
"reason": "Rate limit exceeded"
}
return {"allowed": True}
async def _check_rate_limit(
self, agent_id: str, tool_name: str
) -> bool:
"""도구별 호출 빈도 제한"""
key = f"rate:{agent_id}:{tool_name}"
count = await self.rate_store.increment(key, ttl=60)
limits = {
"database_write": 10, # 분당 10회
"external_api": 30, # 분당 30회
"send_email": 5, # 분당 5회
}
limit = limits.get(tool_name, 60) # 기본 분당 60회
return count <= limit에이전트에게 "관리자 권한"을 부여하는 것은 매우 위험합니다. 프롬프트 인젝션 공격이 성공하면 공격자가 관리자 권한의 모든 도구를 사용할 수 있습니다. 각 에이전트에는 실제로 필요한 최소한의 도구만 허용하세요.
에이전트 간 통신에서 개인식별정보(PII)가 불필요하게 노출되지 않도록 필터링합니다.
import re
class PIIFilter:
"""에이전트 메시지에서 PII 감지 및 마스킹"""
PATTERNS = {
"email": r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}",
"phone_kr": r"01[0-9]-?\d{3,4}-?\d{4}",
"rrn": r"\d{6}-?[1-4]\d{6}",
"card_number": r"\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}",
"ip_address": r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}",
}
def detect(self, text: str) -> list[dict]:
"""PII 탐지"""
findings = []
for pii_type, pattern in self.PATTERNS.items():
matches = re.finditer(pattern, text)
for match in matches:
findings.append({
"type": pii_type,
"value": match.group(),
"position": match.span()
})
return findings
def mask(self, text: str) -> str:
"""PII 마스킹"""
masked = text
for pii_type, pattern in self.PATTERNS.items():
masked = re.sub(
pattern,
f"[{pii_type.upper()}_MASKED]",
masked
)
return masked
def filter_message(
self, message: dict, target_trust_level: str
) -> dict:
"""메시지의 PII를 신뢰 수준에 따라 필터링"""
if target_trust_level == "internal":
return message # 내부 에이전트에는 필터링 안 함
filtered = message.copy()
if "content" in filtered:
filtered["content"] = self.mask(filtered["content"])
if "context" in filtered:
filtered["context"] = self.mask(
str(filtered["context"])
)
return filtered에이전트가 사용하는 API 키, 데이터베이스 자격증명 등의 비밀을 안전하게 관리합니다.
class AgentSecretManager:
"""에이전트별 비밀 관리"""
def __init__(self, vault_client):
self.vault = vault_client
async def get_secret(
self, agent_id: str, secret_name: str
) -> str | None:
"""에이전트에 할당된 비밀 조회"""
# 에이전트별 비밀 경로
path = f"agents/{agent_id}/secrets/{secret_name}"
try:
return await self.vault.read(path)
except VaultError:
return None
async def rotate_secrets(self, agent_id: str):
"""에이전트의 모든 비밀 교체"""
secrets = await self.vault.list(f"agents/{agent_id}/secrets/")
for secret_name in secrets:
new_value = generate_secure_token()
await self.vault.write(
f"agents/{agent_id}/secrets/{secret_name}",
new_value,
metadata={"rotated_at": datetime.now().isoformat()}
)모든 에이전트 행동을 불변의 감사 로그로 기록합니다. 보안 사고 조사, 규정 준수 증명, 에이전트 행동 분석에 필수적입니다.
@dataclass
class AuditEntry:
timestamp: datetime
agent_id: str
action: str
resource: str
details: dict
outcome: str # "success", "denied", "error"
ip_address: str | None = None
correlation_id: str | None = None
class AuditLogger:
def __init__(self, storage):
self.storage = storage
async def log(self, entry: AuditEntry):
"""감사 로그 기록 (불변)"""
record = {
**entry.__dict__,
"timestamp": entry.timestamp.isoformat(),
"hash": self._compute_hash(entry),
}
# append-only 스토리지에 기록
await self.storage.append("audit_log", record)
def _compute_hash(self, entry: AuditEntry) -> str:
"""감사 로그의 무결성 해시"""
import hashlib
data = f"{entry.timestamp}{entry.agent_id}{entry.action}{entry.resource}"
return hashlib.sha256(data.encode()).hexdigest()[:16]
async def query(
self,
agent_id: str | None = None,
action: str | None = None,
start_time: datetime | None = None,
end_time: datetime | None = None
) -> list[dict]:
"""감사 로그 조회"""
filters = {}
if agent_id:
filters["agent_id"] = agent_id
if action:
filters["action"] = action
if start_time:
filters["start_time"] = start_time
if end_time:
filters["end_time"] = end_time
return await self.storage.query("audit_log", filters)
async def generate_report(
self, period: str
) -> dict:
"""감사 보고서 생성"""
entries = await self.query(
start_time=parse_period_start(period)
)
return {
"period": period,
"total_actions": len(entries),
"denied_actions": len(
[e for e in entries if e["outcome"] == "denied"]
),
"errors": len(
[e for e in entries if e["outcome"] == "error"]
),
"by_agent": self._group_by(entries, "agent_id"),
"by_action": self._group_by(entries, "action"),
"policy_violations": [
e for e in entries
if e["outcome"] == "denied"
]
}조직 수준에서 에이전트의 생성, 배포, 운영에 대한 규칙을 정의합니다.
@dataclass
class GovernancePolicy:
"""조직 수준의 에이전트 거버넌스 정책"""
# 에이전트 등록 요건
require_owner: bool = True
require_description: bool = True
require_security_review: bool = True
max_tools_per_agent: int = 15
# 운영 제한
max_concurrent_agents: int = 100
max_daily_cost_usd: float = 500.0
require_human_approval_for: list[str] = field(
default_factory=lambda: [
"production_deployment",
"pii_access",
"external_api_write",
"financial_transaction"
]
)
# 데이터 정책
pii_retention_days: int = 30
audit_retention_days: int = 365
encrypt_at_rest: bool = True
encrypt_in_transit: bool = True
class GovernanceChecker:
def __init__(self, policy: GovernancePolicy):
self.policy = policy
async def check_registration(
self, registration: AgentRegistration
) -> list[str]:
"""에이전트 등록 시 거버넌스 준수 확인"""
violations = []
if self.policy.require_owner and not registration.owner:
violations.append("소유자(owner) 미지정")
if (
self.policy.require_description
and len(registration.description) < 20
):
violations.append("설명이 너무 짧음 (최소 20자)")
if len(registration.tools) > self.policy.max_tools_per_agent:
violations.append(
f"도구 수 초과: {len(registration.tools)}"
f" / {self.policy.max_tools_per_agent}"
)
return violations
async def check_action(
self, agent_id: str, action: str, context: dict
) -> dict:
"""행동의 거버넌스 준수 확인"""
if action in self.policy.require_human_approval_for:
return {
"approved": False,
"requires_human": True,
"reason": f"Action '{action}' requires human approval"
}
return {"approved": True, "requires_human": False}Microsoft Agent 365와 같은 엔터프라이즈 컨트롤 플레인은 에이전트 거버넌스를 조직 전체에 일관되게 적용합니다. 2026년 5월 GA 예정인 이 서비스는 Microsoft 플랫폼 외에도 오픈소스 프레임워크와 서드파티 에이전트에 대한 거버넌스를 지원합니다.
고위험 행동에 대한 인간 승인 프로세스입니다.
class HumanApprovalGate:
def __init__(self, notification_service, timeout_minutes: int = 30):
self.notifier = notification_service
self.timeout = timeout_minutes
self.pending: dict[str, asyncio.Event] = {}
self.decisions: dict[str, bool] = {}
async def request_approval(
self,
agent_id: str,
action: str,
details: dict
) -> bool:
"""인간 승인 요청"""
request_id = generate_id()
self.pending[request_id] = asyncio.Event()
await self.notifier.send_approval_request({
"request_id": request_id,
"agent": agent_id,
"action": action,
"details": details,
"expires_at": (
datetime.now()
+ timedelta(minutes=self.timeout)
).isoformat()
})
# 승인 대기 (타임아웃 적용)
try:
await asyncio.wait_for(
self.pending[request_id].wait(),
timeout=self.timeout * 60
)
return self.decisions.get(request_id, False)
except asyncio.TimeoutError:
return False # 타임아웃 시 거부
async def approve(self, request_id: str):
"""승인 처리"""
self.decisions[request_id] = True
if request_id in self.pending:
self.pending[request_id].set()
async def reject(self, request_id: str):
"""거부 처리"""
self.decisions[request_id] = False
if request_id in self.pending:
self.pending[request_id].set()다음 장에서는 이 시리즈에서 다룬 모든 개념을 종합하여 실전 프로젝트를 구축합니다. 완전한 멀티에이전트 오케스트레이션 시스템을 처음부터 설계하고 구현합니다.
이 글이 도움이 되셨나요?
고객 서비스 자동화 시스템을 처음부터 설계하고 구축하며, 감독자-워커 아키텍처, A2A 통신, 컨트롤 플레인, 관측 가능성을 통합합니다.
멀티에이전트 시스템의 분산 추적, 구조화된 로깅, 메트릭 수집, 에이전트 행동 시각화, 그리고 프로덕션 디버깅 전략을 다룹니다.
수십에서 수백 개의 에이전트를 운영하는 플릿 관리 전략, 자동 스케일링, 로드 밸런싱, 비용 최적화를 다룹니다.