ERP/CRM/ITSM 연동, MCP 기반 도구 통합, API 게이트웨이, 이벤트 드리븐 통합, 레거시 시스템 어댑터, 트랜잭션 경계 설계를 다룹니다.
엔터프라이즈 환경에서 Agentic Workflow를 도입할 때 가장 큰 도전은 기존 시스템과의 통합입니다. 대부분의 기업은 수년에서 수십 년에 걸쳐 구축된 다양한 시스템을 운영하고 있으며, 이들은 서로 다른 프로토콜, 데이터 형식, 인증 체계를 사용합니다.
각 엔터프라이즈 시스템에 대한 통합 어댑터를 구축하여, 에이전트가 일관된 인터페이스로 다양한 시스템에 접근할 수 있도록 합니다.
from abc import ABC, abstractmethod
class EnterpriseAdapter(ABC):
"""엔터프라이즈 시스템 통합 어댑터 기본 클래스"""
@abstractmethod
async def authenticate(self) -> None:
"""시스템 인증"""
...
@abstractmethod
async def health_check(self) -> bool:
"""시스템 가용성 확인"""
...
@abstractmethod
async def execute(self, operation: str, params: dict) -> dict:
"""작업 실행"""
...
class SAPAdapter(EnterpriseAdapter):
"""SAP ERP 어댑터"""
def __init__(self, config: SAPConfig):
self.config = config
self.client = SAPClient(config)
async def authenticate(self) -> None:
await self.client.connect(
host=self.config.host,
client=self.config.client_id,
user=self.config.username,
password=self.config.password,
)
async def health_check(self) -> bool:
try:
await self.client.ping()
return True
except Exception:
return False
async def execute(self, operation: str, params: dict) -> dict:
match operation:
case "get_purchase_order":
return await self.client.call_bapi(
"BAPI_PO_GETDETAIL",
PURCHASEORDER=params["po_number"],
)
case "create_invoice":
return await self.client.call_bapi(
"BAPI_INCOMINGINVOICE_CREATE",
**self._map_invoice_params(params),
)
case _:
raise UnsupportedOperation(f"SAP 어댑터: {operation} 미지원")
class SalesforceAdapter(EnterpriseAdapter):
"""Salesforce CRM 어댑터"""
def __init__(self, config: SalesforceConfig):
self.config = config
self.client = None
async def authenticate(self) -> None:
self.client = await SalesforceClient.create(
instance_url=self.config.instance_url,
access_token=await self._get_oauth_token(),
)
async def execute(self, operation: str, params: dict) -> dict:
match operation:
case "get_account":
return await self.client.query(
f"SELECT Id, Name, Industry FROM Account "
f"WHERE Id = '{params['account_id']}'"
)
case "create_case":
return await self.client.create("Case", params)
case "update_opportunity":
return await self.client.update(
"Opportunity",
params["opportunity_id"],
params["updates"],
)
case _:
raise UnsupportedOperation(f"Salesforce 어댑터: {operation} 미지원")엔터프라이즈 어댑터에서 사용하는 인증 정보(API 키, OAuth 토큰, 서비스 계정 비밀번호)는 절대 코드에 하드코딩하지 않습니다. Vault, AWS Secrets Manager, 환경 변수 등 안전한 비밀 관리 시스템을 사용해야 합니다.
**MCP(Model Context Protocol)**는 에이전트가 사용할 도구를 표준화된 방식으로 정의하고 제공하는 프로토콜입니다. 명시적인 입출력 스키마를 통해 에이전트가 도구를 정확히 사용할 수 있도록 합니다.
from mcp.server import Server
from mcp.types import Tool, TextContent
server = Server("enterprise-tools")
@server.list_tools()
async def list_tools() -> list[Tool]:
return [
Tool(
name="erp_get_order",
description="ERP 시스템에서 주문 정보를 조회합니다. "
"주문번호(order_id)로 상세 정보를 반환합니다.",
inputSchema={
"type": "object",
"properties": {
"order_id": {
"type": "string",
"description": "주문 번호 (예: PO-2026-001234)",
"pattern": "^PO-\\d{4}-\\d{6}$",
},
},
"required": ["order_id"],
},
),
Tool(
name="crm_create_ticket",
description="CRM 시스템에 고객 지원 티켓을 생성합니다.",
inputSchema={
"type": "object",
"properties": {
"customer_id": {"type": "string"},
"subject": {"type": "string", "maxLength": 200},
"description": {"type": "string"},
"priority": {
"type": "string",
"enum": ["low", "medium", "high", "urgent"],
},
"category": {"type": "string"},
},
"required": ["customer_id", "subject", "priority"],
},
),
Tool(
name="itsm_get_incident",
description="ITSM 시스템에서 인시던트 정보를 조회합니다.",
inputSchema={
"type": "object",
"properties": {
"incident_id": {"type": "string"},
},
"required": ["incident_id"],
},
),
]
@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
match name:
case "erp_get_order":
adapter = get_adapter("sap")
result = await adapter.execute("get_purchase_order", {
"po_number": arguments["order_id"],
})
return [TextContent(type="text", text=json.dumps(result))]
case "crm_create_ticket":
adapter = get_adapter("salesforce")
result = await adapter.execute("create_case", arguments)
return [TextContent(type="text", text=json.dumps(result))]
case "itsm_get_incident":
adapter = get_adapter("servicenow")
result = await adapter.execute("get_incident", arguments)
return [TextContent(type="text", text=json.dumps(result))]MCP의 핵심 장점은 도구의 입출력 스키마가 명시적으로 정의된다는 점입니다. 에이전트는 스키마를 참고하여 올바른 형식으로 도구를 호출하고, 결과를 예측 가능한 구조로 받을 수 있습니다.
여러 엔터프라이즈 시스템에 대한 접근을 **API 게이트웨이(API Gateway)**를 통해 중앙 관리합니다.
class APIGateway:
def __init__(self):
self.adapters: dict[str, EnterpriseAdapter] = {}
self.rate_limiters: dict[str, RateLimiter] = {}
self.cache = ResponseCache()
self.audit_logger = AuditLogger()
async def execute(
self,
system: str,
operation: str,
params: dict,
caller: AgentIdentity,
) -> dict:
# 1. 인가 확인
if not await self.authorize(caller, system, operation):
raise UnauthorizedError(
f"{caller.agent_id}는 {system}.{operation} 권한이 없습니다"
)
# 2. Rate Limiting
limiter = self.rate_limiters.get(system)
if limiter and not await limiter.allow():
raise RateLimitExceeded(f"{system} 요청 한도 초과")
# 3. 캐시 확인 (읽기 작업만)
if self._is_read_operation(operation):
cached = await self.cache.get(system, operation, params)
if cached:
return cached
# 4. 감사 로그 (요청)
audit_id = await self.audit_logger.log_request(
caller=caller, system=system,
operation=operation, params=params,
)
# 5. 어댑터를 통한 실행
adapter = self.adapters[system]
try:
result = await adapter.execute(operation, params)
# 6. 캐시 저장 (읽기 작업만)
if self._is_read_operation(operation):
await self.cache.set(system, operation, params, result)
# 7. 감사 로그 (응답)
await self.audit_logger.log_response(
audit_id=audit_id, outcome="success", result_summary=result,
)
return result
except Exception as e:
await self.audit_logger.log_response(
audit_id=audit_id, outcome="failure", error=str(e),
)
raise실시간 연동이 필요한 경우 이벤트 드리븐(Event-Driven) 아키텍처를 활용합니다. Apache Kafka를 메시지 브로커로 사용하여 시스템 간 느슨한 결합을 유지하면서 실시간 데이터 흐름을 구현합니다.
from aiokafka import AIOKafkaConsumer
import json
class WorkflowEventConsumer:
def __init__(self, workflow_engine: WorkflowEngine):
self.engine = workflow_engine
self.consumer = AIOKafkaConsumer(
"erp.orders", "crm.cases", "itsm.incidents",
bootstrap_servers="kafka:9092",
group_id="agentic-workflow-consumer",
value_deserializer=lambda m: json.loads(m.decode("utf-8")),
enable_auto_commit=False, # 수동 커밋으로 정확히 한 번 처리
)
async def start(self):
await self.consumer.start()
try:
async for message in self.consumer:
await self._process_message(message)
await self.consumer.commit()
finally:
await self.consumer.stop()
async def _process_message(self, message):
topic = message.topic
event = message.value
match topic:
case "erp.orders":
if event["type"] == "order_created":
await self.engine.start_workflow(
workflow_type="order_processing",
input_data=event,
)
case "crm.cases":
if event["type"] == "case_created":
await self.engine.start_workflow(
workflow_type="customer_support",
input_data=event,
)
case "itsm.incidents":
if event["type"] == "incident_detected":
await self.engine.start_workflow(
workflow_type="incident_response",
input_data=event,
)워크플로우의 결과를 다른 시스템에 이벤트로 전달합니다.
from aiokafka import AIOKafkaProducer
import json
class WorkflowEventProducer:
def __init__(self):
self.producer = AIOKafkaProducer(
bootstrap_servers="kafka:9092",
value_serializer=lambda v: json.dumps(v).encode("utf-8"),
acks="all", # 모든 레플리카에 기록 확인
)
async def publish_workflow_result(
self,
workflow_id: str,
result: dict,
) -> None:
event = {
"type": "workflow_completed",
"workflow_id": workflow_id,
"result": result,
"timestamp": datetime.utcnow().isoformat(),
}
await self.producer.send_and_wait(
topic="workflow.results",
value=event,
key=workflow_id.encode(),
)현대적인 API가 없는 레거시 시스템과의 통합은 특별한 접근이 필요합니다.
class LegacyScreenScrapingAdapter(EnterpriseAdapter):
"""화면 스크래핑 기반 레거시 시스템 어댑터"""
async def execute(self, operation: str, params: dict) -> dict:
session = await self._login()
match operation:
case "query_mainframe":
# 터미널 에뮬레이션으로 메인프레임 조회
await session.send_keys(params["command"])
screen = await session.read_screen()
return self._parse_screen(screen)
case "submit_batch":
# 파일 기반 배치 인터페이스
input_file = self._generate_input_file(params)
await self._upload_file(input_file)
await self._trigger_batch_job()
return {"status": "submitted", "job_id": self._last_job_id}
class LegacyDatabaseAdapter(EnterpriseAdapter):
"""직접 데이터베이스 연결 기반 레거시 어댑터"""
async def execute(self, operation: str, params: dict) -> dict:
# 읽기 전용 연결 사용
async with self.read_pool.acquire() as conn:
match operation:
case "query":
rows = await conn.fetch(params["sql"], *params.get("args", []))
return {"rows": [dict(r) for r in rows]}레거시 시스템 통합 시에는 **반부패 계층(Anti-Corruption Layer)**을 반드시 구현합니다. 레거시 시스템의 데이터 모델과 비즈니스 로직이 새로운 워크플로우 시스템에 "오염"되지 않도록, 어댑터 계층에서 데이터 변환과 개념 매핑을 수행합니다.
시스템 간 데이터 형식이 다를 때 일관된 변환 계층이 필요합니다.
class DataTransformer:
"""시스템 간 데이터 변환"""
def __init__(self):
self.mappings: dict[tuple[str, str], Callable] = {}
def register(self, from_system: str, to_system: str):
def decorator(func):
self.mappings[(from_system, to_system)] = func
return func
return decorator
def transform(self, data: dict, from_system: str, to_system: str) -> dict:
key = (from_system, to_system)
if key not in self.mappings:
raise MappingNotFound(f"{from_system} -> {to_system} 변환 정의 없음")
return self.mappings[key](data)
transformer = DataTransformer()
@transformer.register("salesforce", "sap")
def sf_to_sap(data: dict) -> dict:
"""Salesforce 주문 -> SAP 구매 주문 변환"""
return {
"PURCHASEORDER": {
"PO_NUMBER": data.get("OrderNumber"),
"VENDOR": data.get("Account", {}).get("SAPVendorId"),
"PO_DATE": data.get("EffectiveDate"),
"ITEMS": [
{
"MATERIAL": item.get("Product2", {}).get("SAPMaterialCode"),
"QUANTITY": item.get("Quantity"),
"UNIT": item.get("UnitOfMeasure", "EA"),
"NET_PRICE": item.get("UnitPrice"),
}
for item in data.get("OrderItems", [])
],
}
}여러 시스템에 걸친 작업에서 데이터 일관성을 보장하는 트랜잭션 경계(Transaction Boundary) 설계가 중요합니다.
엔터프라이즈 시스템 간의 분산 트랜잭션은 5장에서 다룬 Saga 패턴으로 관리합니다. 각 시스템 호출에 대한 보상 액션을 정의하고, 실패 시 역순으로 보상을 실행합니다.
# 크로스 시스템 Saga 예시: 주문 처리
cross_system_saga = SagaOrchestrator([
SagaStep(
name="create_crm_opportunity",
action=lambda ctx: crm_adapter.execute("create_opportunity", ctx["order"]),
compensation=lambda ctx, r: crm_adapter.execute("delete_opportunity", {"id": r["id"]}),
),
SagaStep(
name="create_erp_order",
action=lambda ctx: erp_adapter.execute("create_purchase_order", ctx["order"]),
compensation=lambda ctx, r: erp_adapter.execute("cancel_purchase_order", {"id": r["po_id"]}),
),
SagaStep(
name="reserve_inventory",
action=lambda ctx: erp_adapter.execute("reserve_stock", ctx["items"]),
compensation=lambda ctx, r: erp_adapter.execute("release_stock", {"reservation_id": r["id"]}),
),
SagaStep(
name="create_itsm_change_request",
action=lambda ctx: itsm_adapter.execute("create_change", ctx["deployment"]),
compensation=lambda ctx, r: itsm_adapter.execute("cancel_change", {"id": r["change_id"]}),
),
])이 장에서는 Agentic Workflow와 엔터프라이즈 시스템의 통합 패턴을 살펴보았습니다.
9장에서는 Agentic Workflow의 보안과 거버넌스를 다룹니다. 최소 권한 원칙, 도구별 권한 제어, 비밀 관리, 비용 제어, 위험 평가 등 안전한 에이전트 운영을 위한 프레임워크를 살펴보겠습니다.
이 글이 도움이 되셨나요?
관련 주제 더 보기
최소 권한 원칙, 도구별 권한 제어, 비밀 관리, 입출력 검증, 비용 제어, 에이전트 거버넌스 프레임워크, 위험 평가, 모니터링과 알림 전략을 다룹니다.
에이전트 행동 추적, 불변 감사 로그 설계, 규제 요구사항 대응, 설명 가능성, 재현 가능성, OpenTelemetry 통합, 보존 정책을 다룹니다.
고객 지원 자동화 워크플로우를 LangGraph로 구현하고, HITL, 감사 로깅, 엔터프라이즈 통합, 보안까지 포함한 프로덕션 수준 시스템을 구축합니다.