본문으로 건너뛰기
Kreath Archive
TechProjectsBooksAbout
TechProjectsBooksAbout
© 2026 Kreath
홈TechProjectsBooksAbout
//
  1. 홈
  2. 테크
  3. 10장: 보안과 거버넌스
2026년 4월 8일·AI / ML·

10장: 보안과 거버넌스

멀티에이전트 시스템의 인증과 인가, 최소 권한 원칙, 데이터 보호, 감사 추적, 규제 준수까지 프로덕션 보안과 거버넌스를 다룹니다.

16분1,578자6개 섹션
ai-에이전트보안거버넌스인증-인가
공유
agent-orchestration10 / 11
1234567891011
이전9장: 관측 가능성과 디버깅다음11장: 실전 프로젝트 — 멀티에이전트 오케스트레이션 시스템 구축

에이전트 보안의 특수성

멀티에이전트 시스템의 보안은 전통적인 소프트웨어 보안보다 더 넓은 범위를 다뤄야 합니다. 에이전트는 자율적으로 결정을 내리고, 외부 도구를 호출하며, 다른 에이전트와 자유롭게 통신합니다. 이 자율성이 강력한 기능인 동시에 보안 위협의 원천이 됩니다.

에이전트 시스템에서 고유한 보안 위협은 다음과 같습니다.

프롬프트 인젝션을 통한 권한 상승: 악의적인 입력이 에이전트의 시스템 프롬프트를 우회하여, 허가되지 않은 도구를 호출하거나 데이터에 접근할 수 있습니다.

에이전트 간 신뢰 남용: 에이전트 A가 에이전트 B를 "신뢰"하기 때문에, 침해된 에이전트 A를 통해 에이전트 B의 기능에 접근할 수 있습니다.

도구 호출의 부작용: 에이전트가 외부 API를 호출할 때 의도하지 않은 부작용(데이터 삭제, 결제 처리 등)이 발생할 수 있습니다.

에이전트 인증(Authentication)

에이전트 신원 관리

각 에이전트에 고유한 신원(Identity)을 부여하고, 모든 행동을 해당 신원에 귀속시킵니다.

python
from dataclasses import dataclass
from datetime import datetime, timedelta
import jwt
 
@dataclass
class AgentIdentity:
    """에이전트의 신원 증명"""
    agent_id: str
    name: str
    owner: str  # 담당 팀
    created_at: datetime
    capabilities: list[str]
    trust_level: str  # "internal", "external", "sandboxed"
 
class AgentAuthenticator:
    def __init__(self, secret_key: str):
        self.secret = secret_key
 
    def issue_token(
        self, identity: AgentIdentity, ttl_hours: int = 24
    ) -> str:
        """에이전트 토큰 발급"""
        payload = {
            "sub": identity.agent_id,
            "name": identity.name,
            "owner": identity.owner,
            "capabilities": identity.capabilities,
            "trust_level": identity.trust_level,
            "iat": datetime.now(),
            "exp": datetime.now() + timedelta(hours=ttl_hours)
        }
        return jwt.encode(payload, self.secret, algorithm="HS256")
 
    def verify_token(self, token: str) -> AgentIdentity:
        """에이전트 토큰 검증"""
        try:
            payload = jwt.decode(
                token, self.secret, algorithms=["HS256"]
            )
            return AgentIdentity(
                agent_id=payload["sub"],
                name=payload["name"],
                owner=payload["owner"],
                created_at=datetime.fromisoformat(
                    payload["iat"]
                ) if isinstance(payload["iat"], str)
                else datetime.fromtimestamp(payload["iat"]),
                capabilities=payload["capabilities"],
                trust_level=payload["trust_level"]
            )
        except jwt.ExpiredSignatureError:
            raise AuthenticationError("Token expired")
        except jwt.InvalidTokenError:
            raise AuthenticationError("Invalid token")

A2A 에이전트 간 인증

외부 에이전트와의 통신에서는 Agent Card의 인증 정보를 활용합니다.

python
class A2AAuthMiddleware:
    """A2A 프로토콜의 인증 미들웨어"""
 
    def __init__(self, authenticator: AgentAuthenticator):
        self.auth = authenticator
        self.allowed_agents: dict[str, list[str]] = {}
 
    def allow(self, agent_id: str, capabilities: list[str]):
        """특정 에이전트의 접근 허용"""
        self.allowed_agents[agent_id] = capabilities
 
    async def authenticate(self, request) -> AgentIdentity:
        """요청의 인증 처리"""
        auth_header = request.headers.get("Authorization")
        if not auth_header or not auth_header.startswith("Bearer "):
            raise AuthenticationError("Missing bearer token")
 
        token = auth_header[7:]
        identity = self.auth.verify_token(token)
 
        # 허용 목록 확인
        if identity.agent_id not in self.allowed_agents:
            raise AuthorizationError(
                f"Agent {identity.agent_id} not in allowed list"
            )
 
        return identity

인가(Authorization)와 최소 권한 원칙

에이전트 권한 모델

에이전트에게 필요한 최소한의 권한만 부여합니다. 마이크로서비스의 RBAC(역할 기반 접근 제어)와 유사하지만, 도구 호출과 데이터 접근 범위까지 포함합니다.

python
from dataclasses import dataclass, field
from enum import Enum
 
class Permission(str, Enum):
    TOOL_READ = "tool:read"       # 읽기 전용 도구
    TOOL_WRITE = "tool:write"     # 쓰기 도구
    TOOL_DELETE = "tool:delete"   # 삭제 도구
    AGENT_INVOKE = "agent:invoke" # 다른 에이전트 호출
    AGENT_HANDOFF = "agent:handoff"  # 핸드오프
    DATA_PII = "data:pii"        # PII 접근
    DATA_FINANCIAL = "data:financial"  # 금융 데이터
    EXTERNAL_API = "external:api"  # 외부 API 호출
 
@dataclass
class AgentRole:
    name: str
    permissions: set[Permission]
    tool_allowlist: list[str] = field(default_factory=list)
    tool_denylist: list[str] = field(default_factory=list)
    data_scopes: list[str] = field(default_factory=list)
 
# 역할 정의 예시
ROLES = {
    "reader": AgentRole(
        name="reader",
        permissions={Permission.TOOL_READ},
        tool_allowlist=["query_database", "search_docs"]
    ),
    "operator": AgentRole(
        name="operator",
        permissions={
            Permission.TOOL_READ,
            Permission.TOOL_WRITE,
            Permission.AGENT_INVOKE
        },
        tool_allowlist=[
            "query_database", "update_record",
            "send_notification"
        ]
    ),
    "admin": AgentRole(
        name="admin",
        permissions={
            Permission.TOOL_READ,
            Permission.TOOL_WRITE,
            Permission.TOOL_DELETE,
            Permission.AGENT_INVOKE,
            Permission.AGENT_HANDOFF,
            Permission.DATA_PII
        }
    )
}
 
class AuthorizationEngine:
    def __init__(self):
        self.agent_roles: dict[str, AgentRole] = {}
 
    def assign_role(self, agent_id: str, role: AgentRole):
        self.agent_roles[agent_id] = role
 
    def check_permission(
        self, agent_id: str, permission: Permission
    ) -> bool:
        role = self.agent_roles.get(agent_id)
        if not role:
            return False
        return permission in role.permissions
 
    def check_tool_access(
        self, agent_id: str, tool_name: str
    ) -> bool:
        role = self.agent_roles.get(agent_id)
        if not role:
            return False
 
        # 거부 목록 우선
        if tool_name in role.tool_denylist:
            return False
 
        # 허용 목록이 비어있으면 모든 도구 허용
        if not role.tool_allowlist:
            return True
 
        return tool_name in role.tool_allowlist
 
    async def authorize_action(
        self,
        agent_id: str,
        action: str,
        resource: str,
        context: dict
    ) -> dict:
        """행동 인가 판단"""
        role = self.agent_roles.get(agent_id)
        if not role:
            return {
                "allowed": False,
                "reason": "No role assigned"
            }
 
        # 도구 호출 인가
        if action == "tool_call":
            allowed = self.check_tool_access(agent_id, resource)
            return {
                "allowed": allowed,
                "reason": (
                    "Tool access granted"
                    if allowed
                    else f"Tool {resource} not in allowlist"
                )
            }
 
        # PII 접근 인가
        if (
            action == "data_access"
            and context.get("contains_pii")
        ):
            allowed = Permission.DATA_PII in role.permissions
            return {
                "allowed": allowed,
                "reason": (
                    "PII access granted"
                    if allowed
                    else "PII access not permitted"
                )
            }
 
        return {"allowed": True, "reason": "Default allow"}

도구 호출 가드레일

에이전트의 도구 호출을 사전에 검증하는 가드레일입니다.

python
class ToolCallGuard:
    """도구 호출 전 보안 검증"""
 
    def __init__(self, auth_engine: AuthorizationEngine):
        self.auth = auth_engine
        self.dangerous_patterns = [
            r"DROP\s+TABLE",
            r"DELETE\s+FROM.*WHERE\s+1\s*=\s*1",
            r"rm\s+-rf",
            r"sudo\s+",
        ]
 
    async def validate(
        self,
        agent_id: str,
        tool_name: str,
        args: dict
    ) -> dict:
        """도구 호출 검증"""
        # 1. 권한 확인
        auth_result = await self.auth.authorize_action(
            agent_id, "tool_call", tool_name, args
        )
        if not auth_result["allowed"]:
            return {
                "allowed": False,
                "reason": auth_result["reason"]
            }
 
        # 2. 위험 패턴 검사
        args_str = str(args)
        for pattern in self.dangerous_patterns:
            if re.search(pattern, args_str, re.IGNORECASE):
                return {
                    "allowed": False,
                    "reason": f"Dangerous pattern detected: {pattern}"
                }
 
        # 3. 속도 제한
        if not await self._check_rate_limit(agent_id, tool_name):
            return {
                "allowed": False,
                "reason": "Rate limit exceeded"
            }
 
        return {"allowed": True}
 
    async def _check_rate_limit(
        self, agent_id: str, tool_name: str
    ) -> bool:
        """도구별 호출 빈도 제한"""
        key = f"rate:{agent_id}:{tool_name}"
        count = await self.rate_store.increment(key, ttl=60)
        limits = {
            "database_write": 10,   # 분당 10회
            "external_api": 30,     # 분당 30회
            "send_email": 5,        # 분당 5회
        }
        limit = limits.get(tool_name, 60)  # 기본 분당 60회
        return count <= limit
Warning

에이전트에게 "관리자 권한"을 부여하는 것은 매우 위험합니다. 프롬프트 인젝션 공격이 성공하면 공격자가 관리자 권한의 모든 도구를 사용할 수 있습니다. 각 에이전트에는 실제로 필요한 최소한의 도구만 허용하세요.

데이터 보호

PII 필터링

에이전트 간 통신에서 개인식별정보(PII)가 불필요하게 노출되지 않도록 필터링합니다.

python
import re
 
class PIIFilter:
    """에이전트 메시지에서 PII 감지 및 마스킹"""
 
    PATTERNS = {
        "email": r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}",
        "phone_kr": r"01[0-9]-?\d{3,4}-?\d{4}",
        "rrn": r"\d{6}-?[1-4]\d{6}",
        "card_number": r"\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}",
        "ip_address": r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}",
    }
 
    def detect(self, text: str) -> list[dict]:
        """PII 탐지"""
        findings = []
        for pii_type, pattern in self.PATTERNS.items():
            matches = re.finditer(pattern, text)
            for match in matches:
                findings.append({
                    "type": pii_type,
                    "value": match.group(),
                    "position": match.span()
                })
        return findings
 
    def mask(self, text: str) -> str:
        """PII 마스킹"""
        masked = text
        for pii_type, pattern in self.PATTERNS.items():
            masked = re.sub(
                pattern,
                f"[{pii_type.upper()}_MASKED]",
                masked
            )
        return masked
 
    def filter_message(
        self, message: dict, target_trust_level: str
    ) -> dict:
        """메시지의 PII를 신뢰 수준에 따라 필터링"""
        if target_trust_level == "internal":
            return message  # 내부 에이전트에는 필터링 안 함
 
        filtered = message.copy()
        if "content" in filtered:
            filtered["content"] = self.mask(filtered["content"])
        if "context" in filtered:
            filtered["context"] = self.mask(
                str(filtered["context"])
            )
        return filtered

비밀 관리(Secrets Management)

에이전트가 사용하는 API 키, 데이터베이스 자격증명 등의 비밀을 안전하게 관리합니다.

python
class AgentSecretManager:
    """에이전트별 비밀 관리"""
 
    def __init__(self, vault_client):
        self.vault = vault_client
 
    async def get_secret(
        self, agent_id: str, secret_name: str
    ) -> str | None:
        """에이전트에 할당된 비밀 조회"""
        # 에이전트별 비밀 경로
        path = f"agents/{agent_id}/secrets/{secret_name}"
        try:
            return await self.vault.read(path)
        except VaultError:
            return None
 
    async def rotate_secrets(self, agent_id: str):
        """에이전트의 모든 비밀 교체"""
        secrets = await self.vault.list(f"agents/{agent_id}/secrets/")
        for secret_name in secrets:
            new_value = generate_secure_token()
            await self.vault.write(
                f"agents/{agent_id}/secrets/{secret_name}",
                new_value,
                metadata={"rotated_at": datetime.now().isoformat()}
            )

감사 추적(Audit Trail)

모든 에이전트 행동을 불변의 감사 로그로 기록합니다. 보안 사고 조사, 규정 준수 증명, 에이전트 행동 분석에 필수적입니다.

python
@dataclass
class AuditEntry:
    timestamp: datetime
    agent_id: str
    action: str
    resource: str
    details: dict
    outcome: str  # "success", "denied", "error"
    ip_address: str | None = None
    correlation_id: str | None = None
 
class AuditLogger:
    def __init__(self, storage):
        self.storage = storage
 
    async def log(self, entry: AuditEntry):
        """감사 로그 기록 (불변)"""
        record = {
            **entry.__dict__,
            "timestamp": entry.timestamp.isoformat(),
            "hash": self._compute_hash(entry),
        }
        # append-only 스토리지에 기록
        await self.storage.append("audit_log", record)
 
    def _compute_hash(self, entry: AuditEntry) -> str:
        """감사 로그의 무결성 해시"""
        import hashlib
        data = f"{entry.timestamp}{entry.agent_id}{entry.action}{entry.resource}"
        return hashlib.sha256(data.encode()).hexdigest()[:16]
 
    async def query(
        self,
        agent_id: str | None = None,
        action: str | None = None,
        start_time: datetime | None = None,
        end_time: datetime | None = None
    ) -> list[dict]:
        """감사 로그 조회"""
        filters = {}
        if agent_id:
            filters["agent_id"] = agent_id
        if action:
            filters["action"] = action
        if start_time:
            filters["start_time"] = start_time
        if end_time:
            filters["end_time"] = end_time
        return await self.storage.query("audit_log", filters)
 
    async def generate_report(
        self, period: str
    ) -> dict:
        """감사 보고서 생성"""
        entries = await self.query(
            start_time=parse_period_start(period)
        )
 
        return {
            "period": period,
            "total_actions": len(entries),
            "denied_actions": len(
                [e for e in entries if e["outcome"] == "denied"]
            ),
            "errors": len(
                [e for e in entries if e["outcome"] == "error"]
            ),
            "by_agent": self._group_by(entries, "agent_id"),
            "by_action": self._group_by(entries, "action"),
            "policy_violations": [
                e for e in entries
                if e["outcome"] == "denied"
            ]
        }

거버넌스 프레임워크

에이전트 거버넌스 정책

조직 수준에서 에이전트의 생성, 배포, 운영에 대한 규칙을 정의합니다.

python
@dataclass
class GovernancePolicy:
    """조직 수준의 에이전트 거버넌스 정책"""
 
    # 에이전트 등록 요건
    require_owner: bool = True
    require_description: bool = True
    require_security_review: bool = True
    max_tools_per_agent: int = 15
 
    # 운영 제한
    max_concurrent_agents: int = 100
    max_daily_cost_usd: float = 500.0
    require_human_approval_for: list[str] = field(
        default_factory=lambda: [
            "production_deployment",
            "pii_access",
            "external_api_write",
            "financial_transaction"
        ]
    )
 
    # 데이터 정책
    pii_retention_days: int = 30
    audit_retention_days: int = 365
    encrypt_at_rest: bool = True
    encrypt_in_transit: bool = True
 
class GovernanceChecker:
    def __init__(self, policy: GovernancePolicy):
        self.policy = policy
 
    async def check_registration(
        self, registration: AgentRegistration
    ) -> list[str]:
        """에이전트 등록 시 거버넌스 준수 확인"""
        violations = []
 
        if self.policy.require_owner and not registration.owner:
            violations.append("소유자(owner) 미지정")
 
        if (
            self.policy.require_description
            and len(registration.description) < 20
        ):
            violations.append("설명이 너무 짧음 (최소 20자)")
 
        if len(registration.tools) > self.policy.max_tools_per_agent:
            violations.append(
                f"도구 수 초과: {len(registration.tools)}"
                f" / {self.policy.max_tools_per_agent}"
            )
 
        return violations
 
    async def check_action(
        self, agent_id: str, action: str, context: dict
    ) -> dict:
        """행동의 거버넌스 준수 확인"""
        if action in self.policy.require_human_approval_for:
            return {
                "approved": False,
                "requires_human": True,
                "reason": f"Action '{action}' requires human approval"
            }
        return {"approved": True, "requires_human": False}
Info

Microsoft Agent 365와 같은 엔터프라이즈 컨트롤 플레인은 에이전트 거버넌스를 조직 전체에 일관되게 적용합니다. 2026년 5월 GA 예정인 이 서비스는 Microsoft 플랫폼 외에도 오픈소스 프레임워크와 서드파티 에이전트에 대한 거버넌스를 지원합니다.

Human-in-the-Loop 승인 흐름

고위험 행동에 대한 인간 승인 프로세스입니다.

python
class HumanApprovalGate:
    def __init__(self, notification_service, timeout_minutes: int = 30):
        self.notifier = notification_service
        self.timeout = timeout_minutes
        self.pending: dict[str, asyncio.Event] = {}
        self.decisions: dict[str, bool] = {}
 
    async def request_approval(
        self,
        agent_id: str,
        action: str,
        details: dict
    ) -> bool:
        """인간 승인 요청"""
        request_id = generate_id()
        self.pending[request_id] = asyncio.Event()
 
        await self.notifier.send_approval_request({
            "request_id": request_id,
            "agent": agent_id,
            "action": action,
            "details": details,
            "expires_at": (
                datetime.now()
                + timedelta(minutes=self.timeout)
            ).isoformat()
        })
 
        # 승인 대기 (타임아웃 적용)
        try:
            await asyncio.wait_for(
                self.pending[request_id].wait(),
                timeout=self.timeout * 60
            )
            return self.decisions.get(request_id, False)
        except asyncio.TimeoutError:
            return False  # 타임아웃 시 거부
 
    async def approve(self, request_id: str):
        """승인 처리"""
        self.decisions[request_id] = True
        if request_id in self.pending:
            self.pending[request_id].set()
 
    async def reject(self, request_id: str):
        """거부 처리"""
        self.decisions[request_id] = False
        if request_id in self.pending:
            self.pending[request_id].set()

다음 장에서는 이 시리즈에서 다룬 모든 개념을 종합하여 실전 프로젝트를 구축합니다. 완전한 멀티에이전트 오케스트레이션 시스템을 처음부터 설계하고 구현합니다.

이 글이 도움이 되셨나요?

관련 글

AI / ML

11장: 실전 프로젝트 — 멀티에이전트 오케스트레이션 시스템 구축

고객 서비스 자동화 시스템을 처음부터 설계하고 구축하며, 감독자-워커 아키텍처, A2A 통신, 컨트롤 플레인, 관측 가능성을 통합합니다.

2026년 4월 9일·16분
AI / ML

9장: 관측 가능성과 디버깅

멀티에이전트 시스템의 분산 추적, 구조화된 로깅, 메트릭 수집, 에이전트 행동 시각화, 그리고 프로덕션 디버깅 전략을 다룹니다.

2026년 4월 6일·13분
AI / ML

8장: 에이전트 플릿 관리와 스케일링

수십에서 수백 개의 에이전트를 운영하는 플릿 관리 전략, 자동 스케일링, 로드 밸런싱, 비용 최적화를 다룹니다.

2026년 4월 5일·13분
이전 글9장: 관측 가능성과 디버깅
다음 글11장: 실전 프로젝트 — 멀티에이전트 오케스트레이션 시스템 구축

댓글

목차

약 16분 남음
  • 에이전트 보안의 특수성
  • 에이전트 인증(Authentication)
    • 에이전트 신원 관리
    • A2A 에이전트 간 인증
  • 인가(Authorization)와 최소 권한 원칙
    • 에이전트 권한 모델
    • 도구 호출 가드레일
  • 데이터 보호
    • PII 필터링
    • 비밀 관리(Secrets Management)
  • 감사 추적(Audit Trail)
  • 거버넌스 프레임워크
    • 에이전트 거버넌스 정책
    • Human-in-the-Loop 승인 흐름