A2A(Agent-to-Agent) 프로토콜, MCP(Model Context Protocol), 그리고 커스텀 메시지 패턴까지 에이전트 간 통신의 표준과 구현을 다룹니다.
멀티에이전트 시스템에서 에이전트 간 통신은 가장 근본적인 과제입니다. 같은 프레임워크로 구축된 에이전트들은 내부 API로 쉽게 소통할 수 있지만, 서로 다른 프레임워크, 서로 다른 조직, 서로 다른 클라우드에서 실행되는 에이전트들은 공통의 언어가 필요합니다.
이 문제를 해결하기 위해 두 가지 주요 프로토콜이 등장했습니다.
MCP(Model Context Protocol)는 에이전트와 도구(외부 시스템) 사이의 수직적 통신을 표준화합니다. 에이전트가 데이터베이스, API, 파일 시스템 등 외부 리소스에 접근하는 방식을 정의합니다.
A2A(Agent-to-Agent)는 에이전트와 에이전트 사이의 수평적 통신을 표준화합니다. 서로 다른 시스템의 에이전트가 작업을 발견, 위임, 협업하는 방식을 정의합니다.
┌─────────────────────────────────────────────┐
│ 에이전트 통신 레이어 │
│ │
│ [에이전트 A] ◄──── A2A ────► [에이전트 B] │
│ │ │ │
│ MCP MCP │
│ │ │ │
│ [데이터베이스] [외부 API] │
└─────────────────────────────────────────────┘
Google이 주도하고 Linux Foundation에서 관리하는 A2A 프로토콜은 이기종 에이전트 시스템 간의 상호운용성을 위한 개방형 표준입니다.
A2A는 네 가지 핵심 개념을 중심으로 설계되었습니다.
Agent Card: 에이전트의 신원증입니다. 에이전트의 이름, 설명, 보유 능력, 접속 URL, 인증 방식 등을 JSON 형태로 기술합니다. 클라이언트 에이전트는 Agent Card를 읽어 "이 에이전트가 나의 작업을 처리할 수 있는가?"를 판단합니다.
{
"name": "data-analyst-agent",
"description": "데이터 분석과 시각화를 수행하는 에이전트",
"url": "https://agents.example.com/data-analyst",
"version": "1.0.0",
"capabilities": {
"streaming": true,
"pushNotifications": true,
"stateTransitionHistory": true
},
"skills": [
{
"id": "analyze-csv",
"name": "CSV 데이터 분석",
"description": "CSV 파일을 읽고 통계 분석을 수행합니다",
"tags": ["data", "analytics", "csv"]
},
{
"id": "create-chart",
"name": "차트 생성",
"description": "분석 결과를 다양한 차트로 시각화합니다",
"tags": ["visualization", "chart"]
}
],
"authentication": {
"schemes": ["bearer"]
}
}Task: 에이전트 간 협업의 기본 단위입니다. 클라이언트가 원격 에이전트에게 작업을 요청하면 Task 객체가 생성되며, 생명주기(submitted → working → completed/failed)를 거칩니다.
Message: Task 내에서 에이전트 간에 주고받는 통신 단위입니다. 텍스트, 파일, 구조화된 데이터 등 다양한 형태의 콘텐츠를 담을 수 있습니다.
Artifact: Task의 산출물입니다. 분석 결과 파일, 생성된 이미지, 코드 등 에이전트가 작업을 통해 만들어낸 결과물을 의미합니다.
A2A의 기본 통신 흐름은 다음과 같습니다.
1. 발견(Discovery)
클라이언트 → GET /.well-known/agent.json → Agent Card 수신
2. 작업 요청(Task Send)
클라이언트 → POST /tasks/send → Task 생성
3. 상태 확인(Status Check)
클라이언트 → GET /tasks/{id} → 현재 상태 조회
4. 스트리밍 수신(Streaming)
클라이언트 → POST /tasks/sendSubscribe → SSE로 실시간 수신
5. 결과 수신(Artifact Retrieval)
Task 완료 시 Artifact 목록을 통해 결과물 접근
import httpx
from dataclasses import dataclass
@dataclass
class A2ATask:
id: str
status: str
artifacts: list[dict]
class A2AClient:
def __init__(self, agent_url: str, auth_token: str):
self.agent_url = agent_url
self.client = httpx.AsyncClient(
headers={"Authorization": f"Bearer {auth_token}"}
)
async def discover(self) -> dict:
"""Agent Card를 조회하여 에이전트 능력을 확인"""
response = await self.client.get(
f"{self.agent_url}/.well-known/agent.json"
)
return response.json()
async def send_task(self, message: str) -> A2ATask:
"""작업을 전송하고 결과를 대기"""
payload = {
"jsonrpc": "2.0",
"method": "tasks/send",
"params": {
"id": generate_task_id(),
"message": {
"role": "user",
"parts": [{"type": "text", "text": message}]
}
}
}
response = await self.client.post(
f"{self.agent_url}",
json=payload
)
result = response.json()["result"]
return A2ATask(
id=result["id"],
status=result["status"]["state"],
artifacts=result.get("artifacts", [])
)
async def send_task_streaming(self, message: str):
"""SSE를 통해 실시간 결과 수신"""
payload = {
"jsonrpc": "2.0",
"method": "tasks/sendSubscribe",
"params": {
"id": generate_task_id(),
"message": {
"role": "user",
"parts": [{"type": "text", "text": message}]
}
}
}
async with self.client.stream(
"POST", self.agent_url, json=payload
) as response:
async for line in response.aiter_lines():
if line.startswith("data:"):
yield json.loads(line[5:])from a2a.server import A2AServer, TaskHandler
from a2a.types import (
AgentCard, Skill, TaskState,
Message, TextPart, Artifact
)
class DataAnalystHandler(TaskHandler):
async def handle_task(self, task_id: str, message: Message):
# 상태를 working으로 업데이트
await self.update_status(task_id, TaskState.WORKING)
# 메시지에서 분석 요청 추출
user_text = message.parts[0].text
# 분석 수행 (내부 로직)
result = await self.analyze_data(user_text)
# Artifact로 결과 반환
await self.add_artifact(
task_id,
Artifact(
name="analysis-result",
parts=[TextPart(text=result)]
)
)
# 상태를 completed로 업데이트
await self.update_status(task_id, TaskState.COMPLETED)
# 서버 설정
agent_card = AgentCard(
name="data-analyst",
description="데이터 분석 전문 에이전트",
skills=[
Skill(
id="analyze",
name="데이터 분석",
description="정형 데이터를 분석합니다"
)
]
)
server = A2AServer(
agent_card=agent_card,
handler=DataAnalystHandler()
)A2A의 핵심 가치는 불투명성(Opacity)입니다. 클라이언트 에이전트는 원격 에이전트의 내부 구현을 알 필요가 없습니다. LangGraph로 구축했든, CrewAI로 구축했든, Agent Card만 제공하면 협업이 가능합니다.
MCP와 A2A는 경쟁 관계가 아니라 보완 관계입니다. Google의 공식 문서에서도 이 둘의 역할을 명확히 구분합니다.
MCP는 에이전트가 외부 도구와 데이터 소스에 접근하는 방식을 표준화합니다. 데이터베이스 쿼리, API 호출, 파일 시스템 접근 등 에이전트의 "손과 발" 역할을 합니다.
# MCP 서버: 데이터베이스 도구 제공
from mcp.server import Server
from mcp.types import Tool, TextContent
server = Server("database-tools")
@server.tool()
async def query_users(department: str) -> list[dict]:
"""특정 부서의 사용자 목록을 조회합니다"""
result = await db.execute(
"SELECT * FROM users WHERE department = $1",
department
)
return result.rowsA2A는 에이전트 간의 작업 위임과 협업을 표준화합니다. "내가 처리할 수 없는 작업을 할 수 있는 에이전트를 찾아 맡기는" 시나리오입니다.
# A2A: 분석 에이전트가 시각화 에이전트에게 작업 위임
async def orchestrate_report(query: str):
# 1. MCP로 데이터 수집 (도구 접근)
raw_data = await mcp_client.call_tool(
"query_sales", {"period": "Q1-2026"}
)
# 2. 자체적으로 분석 수행
analysis = await self.analyze(raw_data)
# 3. A2A로 시각화 에이전트에게 위임 (에이전트 협업)
viz_client = A2AClient("https://viz-agent.example.com")
chart_task = await viz_client.send_task(
f"다음 데이터를 바 차트로 시각화해주세요: {analysis}"
)
return {"analysis": analysis, "chart": chart_task.artifacts}┌─────────────────────────────────────────────────────┐
│ 에이전트 오케스트레이터 │
│ │
│ ┌─────────────┐ A2A ┌─────────────┐ │
│ │ 분석 에이전트│◄─────►│시각화 에이전트│ │
│ └──────┬──────┘ └──────┬──────┘ │
│ │ MCP │ MCP │
│ ┌────┴────┐ ┌────┴────┐ │
│ │ DB 서버 │ │차트 API │ │
│ └─────────┘ └─────────┘ │
└─────────────────────────────────────────────────────┘
표준 프로토콜만으로는 모든 통신 요구사항을 충족하기 어렵습니다. 프레임워크 내부의 에이전트 간 통신에는 커스텀 패턴이 필요합니다.
에이전트 간 통신에서 자유 형식 텍스트보다 구조화된 메시지가 훨씬 안정적입니다.
from pydantic import BaseModel
from enum import Enum
from typing import Any
class MessageType(str, Enum):
TASK_REQUEST = "task_request"
TASK_RESULT = "task_result"
STATUS_UPDATE = "status_update"
ERROR = "error"
FEEDBACK = "feedback"
class AgentMessage(BaseModel):
type: MessageType
sender: str
recipient: str
correlation_id: str # 작업 추적용
payload: dict[str, Any]
metadata: dict[str, Any] = {}
class Config:
json_schema_extra = {
"example": {
"type": "task_request",
"sender": "supervisor",
"recipient": "researcher",
"correlation_id": "task-001",
"payload": {
"instruction": "AI 에이전트 최신 트렌드를 조사하세요",
"constraints": {
"max_sources": 10,
"recency": "6months"
}
},
"metadata": {
"priority": "high",
"deadline": "2026-04-09T18:00:00Z"
}
}
}장시간 실행되는 에이전트 작업에는 메시지 큐가 효과적입니다.
import asyncio
from collections import defaultdict
class AgentMessageBroker:
"""경량 인메모리 메시지 브로커"""
def __init__(self):
self._queues: dict[str, asyncio.Queue] = defaultdict(asyncio.Queue)
self._subscribers: dict[str, list[callable]] = defaultdict(list)
async def publish(self, topic: str, message: AgentMessage):
"""토픽에 메시지 발행"""
await self._queues[topic].put(message)
for callback in self._subscribers[topic]:
asyncio.create_task(callback(message))
async def subscribe(self, topic: str, callback: callable):
"""토픽 구독"""
self._subscribers[topic].append(callback)
async def consume(self, topic: str) -> AgentMessage:
"""토픽에서 메시지 소비 (블로킹)"""
return await self._queues[topic].get()
# 사용 예시
broker = AgentMessageBroker()
# 감독자가 작업 요청 발행
await broker.publish("research-tasks", AgentMessage(
type=MessageType.TASK_REQUEST,
sender="supervisor",
recipient="researcher",
correlation_id="report-2026-q1",
payload={"topic": "AI agent trends Q1 2026"}
))
# 리서치 에이전트가 결과 발행
await broker.publish("research-results", AgentMessage(
type=MessageType.TASK_RESULT,
sender="researcher",
recipient="supervisor",
correlation_id="report-2026-q1",
payload={"findings": [...], "sources": [...]}
))에이전트 간에 작업 컨텍스트를 효율적으로 전달하는 것은 통신 설계에서 가장 중요한 부분입니다.
@dataclass
class TaskContext:
"""에이전트 간 전달되는 작업 컨텍스트"""
task_id: str
original_request: str
completed_steps: list[dict]
current_step: str
accumulated_results: dict
constraints: dict
def summarize_for_next_agent(self, max_tokens: int = 2000) -> str:
"""다음 에이전트에게 전달할 요약 생성"""
summary_parts = [
f"원래 요청: {self.original_request}",
f"현재 단계: {self.current_step}",
"이전 단계 결과:"
]
for step in self.completed_steps:
summary_parts.append(
f" - {step['name']}: {step['summary']}"
)
return "\n".join(summary_parts)[:max_tokens]에이전트 간 전체 대화 이력을 전달하면 컨텍스트 윈도우가 급격히 소모됩니다. 이전 에이전트의 결과를 요약하여 핵심 정보만 전달하는 것이 중요합니다. "무엇을 했는가"보다 "결과가 무엇인가"를 전달하세요.
| 시나리오 | 추천 프로토콜 | 이유 |
|---|---|---|
| 같은 프레임워크 내 에이전트 통신 | 프레임워크 내장 (핸드오프, 상태 공유) | 최소 오버헤드 |
| 다른 조직의 에이전트와 협업 | A2A | 표준화된 발견과 인증 |
| 에이전트가 외부 도구에 접근 | MCP | 도구 접근 표준 |
| 장시간 비동기 작업 | A2A + 푸시 알림 | 비동기 상태 추적 |
| 대용량 데이터 교환 | A2A Artifact + 오브젝트 스토리지 | Artifact 참조 방식 |
프로토콜 선택에서 가장 중요한 원칙은 필요한 만큼만 복잡하게입니다. 단일 프로세스 내의 에이전트 통신에 A2A를 도입하는 것은 과잉 설계입니다. 반대로 조직 간 에이전트 협업에 프레임워크 특유의 메커니즘을 사용하면 벤더 종속이 발생합니다.
다음 장에서는 작업을 분해하고 적절한 에이전트에게 위임하는 구체적인 패턴들을 다룹니다. 핸드오프, 라우팅, 동적 위임의 구현과 최적화 전략을 살펴봅니다.
이 글이 도움이 되셨나요?
감독자-워커, 계층적 팀, 피어-투-피어 네트워크 등 멀티에이전트 팀 아키텍처의 설계 원칙과 트레이드오프를 코드 예제와 함께 분석합니다.
에이전트 간 작업 위임의 핵심 메커니즘인 핸드오프 패턴, 라우팅 전략, 동적 위임의 구현과 최적화를 실전 코드와 함께 다룹니다.
단일 에이전트에서 멀티에이전트 시스템으로의 진화 과정, 오케스트레이션의 핵심 개념과 패턴 분류, 그리고 프로덕션 환경에서의 도전 과제를 살펴봅니다.