본문으로 건너뛰기
Kreath Archive
TechProjectsBooksAbout
TechProjectsBooksAbout

내비게이션

  • Tech
  • Projects
  • Books
  • About
  • Tags

카테고리

  • AI / ML
  • 웹 개발
  • 프로그래밍
  • 개발 도구

연결

  • GitHub
  • Email
  • RSS
© 2026 Kreath Archive. All rights reserved.Built with Next.js + MDX
홈TechProjectsBooksAbout
//
  1. 홈
  2. 테크
  3. 8장: 엔터프라이즈 시스템 통합
2026년 3월 15일·AI / ML·

8장: 엔터프라이즈 시스템 통합

ERP/CRM/ITSM 연동, MCP 기반 도구 통합, API 게이트웨이, 이벤트 드리븐 통합, 레거시 시스템 어댑터, 트랜잭션 경계 설계를 다룹니다.

16분1,336자11개 섹션
workflowaiautomation
공유
agentic-workflow8 / 10
12345678910
이전7장: 감사 로깅과 컴플라이언스다음9장: 보안과 거버넌스

이 장에서 배울 내용

  • 엔터프라이즈 시스템(ERP, CRM, ITSM)과의 통합 패턴
  • MCP 기반 도구 통합의 설계와 구현
  • API 게이트웨이를 통한 통합 관리
  • 이벤트 드리븐 아키텍처와 Kafka 활용
  • 레거시 시스템 어댑터 패턴
  • 데이터 변환과 트랜잭션 경계 설계

통합의 도전 과제

엔터프라이즈 환경에서 Agentic Workflow를 도입할 때 가장 큰 도전은 기존 시스템과의 통합입니다. 대부분의 기업은 수년에서 수십 년에 걸쳐 구축된 다양한 시스템을 운영하고 있으며, 이들은 서로 다른 프로토콜, 데이터 형식, 인증 체계를 사용합니다.

주요 도전 과제

  • 인증 다양성: SAML, OAuth2, API Key, 인증서 기반 등 시스템마다 다른 인증 방식
  • 데이터 형식 불일치: XML, JSON, CSV, 독자 포맷 등 혼재
  • 속도 차이: 밀리초 단위 API와 분 단위 배치 처리 시스템의 공존
  • 가용성: 유지보수 창(maintenance window) 동안 시스템 접근 불가
  • 트랜잭션 경계: 여러 시스템에 걸친 데이터 일관성 보장

ERP/CRM/ITSM 연동 패턴

통합 어댑터 계층

각 엔터프라이즈 시스템에 대한 통합 어댑터를 구축하여, 에이전트가 일관된 인터페이스로 다양한 시스템에 접근할 수 있도록 합니다.

enterprise_adapters.py
python
from abc import ABC, abstractmethod
 
class EnterpriseAdapter(ABC):
    """엔터프라이즈 시스템 통합 어댑터 기본 클래스"""
 
    @abstractmethod
    async def authenticate(self) -> None:
        """시스템 인증"""
        ...
 
    @abstractmethod
    async def health_check(self) -> bool:
        """시스템 가용성 확인"""
        ...
 
    @abstractmethod
    async def execute(self, operation: str, params: dict) -> dict:
        """작업 실행"""
        ...
 
 
class SAPAdapter(EnterpriseAdapter):
    """SAP ERP 어댑터"""
 
    def __init__(self, config: SAPConfig):
        self.config = config
        self.client = SAPClient(config)
 
    async def authenticate(self) -> None:
        await self.client.connect(
            host=self.config.host,
            client=self.config.client_id,
            user=self.config.username,
            password=self.config.password,
        )
 
    async def health_check(self) -> bool:
        try:
            await self.client.ping()
            return True
        except Exception:
            return False
 
    async def execute(self, operation: str, params: dict) -> dict:
        match operation:
            case "get_purchase_order":
                return await self.client.call_bapi(
                    "BAPI_PO_GETDETAIL",
                    PURCHASEORDER=params["po_number"],
                )
            case "create_invoice":
                return await self.client.call_bapi(
                    "BAPI_INCOMINGINVOICE_CREATE",
                    **self._map_invoice_params(params),
                )
            case _:
                raise UnsupportedOperation(f"SAP 어댑터: {operation} 미지원")
 
 
class SalesforceAdapter(EnterpriseAdapter):
    """Salesforce CRM 어댑터"""
 
    def __init__(self, config: SalesforceConfig):
        self.config = config
        self.client = None
 
    async def authenticate(self) -> None:
        self.client = await SalesforceClient.create(
            instance_url=self.config.instance_url,
            access_token=await self._get_oauth_token(),
        )
 
    async def execute(self, operation: str, params: dict) -> dict:
        match operation:
            case "get_account":
                return await self.client.query(
                    f"SELECT Id, Name, Industry FROM Account "
                    f"WHERE Id = '{params['account_id']}'"
                )
            case "create_case":
                return await self.client.create("Case", params)
            case "update_opportunity":
                return await self.client.update(
                    "Opportunity",
                    params["opportunity_id"],
                    params["updates"],
                )
            case _:
                raise UnsupportedOperation(f"Salesforce 어댑터: {operation} 미지원")
Warning

엔터프라이즈 어댑터에서 사용하는 인증 정보(API 키, OAuth 토큰, 서비스 계정 비밀번호)는 절대 코드에 하드코딩하지 않습니다. Vault, AWS Secrets Manager, 환경 변수 등 안전한 비밀 관리 시스템을 사용해야 합니다.

MCP 기반 도구 통합

**MCP(Model Context Protocol)**는 에이전트가 사용할 도구를 표준화된 방식으로 정의하고 제공하는 프로토콜입니다. 명시적인 입출력 스키마를 통해 에이전트가 도구를 정확히 사용할 수 있도록 합니다.

MCP 서버 구현

mcp_server.py
python
from mcp.server import Server
from mcp.types import Tool, TextContent
 
server = Server("enterprise-tools")
 
@server.list_tools()
async def list_tools() -> list[Tool]:
    return [
        Tool(
            name="erp_get_order",
            description="ERP 시스템에서 주문 정보를 조회합니다. "
                       "주문번호(order_id)로 상세 정보를 반환합니다.",
            inputSchema={
                "type": "object",
                "properties": {
                    "order_id": {
                        "type": "string",
                        "description": "주문 번호 (예: PO-2026-001234)",
                        "pattern": "^PO-\\d{4}-\\d{6}$",
                    },
                },
                "required": ["order_id"],
            },
        ),
        Tool(
            name="crm_create_ticket",
            description="CRM 시스템에 고객 지원 티켓을 생성합니다.",
            inputSchema={
                "type": "object",
                "properties": {
                    "customer_id": {"type": "string"},
                    "subject": {"type": "string", "maxLength": 200},
                    "description": {"type": "string"},
                    "priority": {
                        "type": "string",
                        "enum": ["low", "medium", "high", "urgent"],
                    },
                    "category": {"type": "string"},
                },
                "required": ["customer_id", "subject", "priority"],
            },
        ),
        Tool(
            name="itsm_get_incident",
            description="ITSM 시스템에서 인시던트 정보를 조회합니다.",
            inputSchema={
                "type": "object",
                "properties": {
                    "incident_id": {"type": "string"},
                },
                "required": ["incident_id"],
            },
        ),
    ]
 
@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
    match name:
        case "erp_get_order":
            adapter = get_adapter("sap")
            result = await adapter.execute("get_purchase_order", {
                "po_number": arguments["order_id"],
            })
            return [TextContent(type="text", text=json.dumps(result))]
 
        case "crm_create_ticket":
            adapter = get_adapter("salesforce")
            result = await adapter.execute("create_case", arguments)
            return [TextContent(type="text", text=json.dumps(result))]
 
        case "itsm_get_incident":
            adapter = get_adapter("servicenow")
            result = await adapter.execute("get_incident", arguments)
            return [TextContent(type="text", text=json.dumps(result))]
Info

MCP의 핵심 장점은 도구의 입출력 스키마가 명시적으로 정의된다는 점입니다. 에이전트는 스키마를 참고하여 올바른 형식으로 도구를 호출하고, 결과를 예측 가능한 구조로 받을 수 있습니다.

API 게이트웨이

여러 엔터프라이즈 시스템에 대한 접근을 **API 게이트웨이(API Gateway)**를 통해 중앙 관리합니다.

게이트웨이의 역할

api_gateway.py
python
class APIGateway:
    def __init__(self):
        self.adapters: dict[str, EnterpriseAdapter] = {}
        self.rate_limiters: dict[str, RateLimiter] = {}
        self.cache = ResponseCache()
        self.audit_logger = AuditLogger()
 
    async def execute(
        self,
        system: str,
        operation: str,
        params: dict,
        caller: AgentIdentity,
    ) -> dict:
        # 1. 인가 확인
        if not await self.authorize(caller, system, operation):
            raise UnauthorizedError(
                f"{caller.agent_id}는 {system}.{operation} 권한이 없습니다"
            )
 
        # 2. Rate Limiting
        limiter = self.rate_limiters.get(system)
        if limiter and not await limiter.allow():
            raise RateLimitExceeded(f"{system} 요청 한도 초과")
 
        # 3. 캐시 확인 (읽기 작업만)
        if self._is_read_operation(operation):
            cached = await self.cache.get(system, operation, params)
            if cached:
                return cached
 
        # 4. 감사 로그 (요청)
        audit_id = await self.audit_logger.log_request(
            caller=caller, system=system,
            operation=operation, params=params,
        )
 
        # 5. 어댑터를 통한 실행
        adapter = self.adapters[system]
        try:
            result = await adapter.execute(operation, params)
 
            # 6. 캐시 저장 (읽기 작업만)
            if self._is_read_operation(operation):
                await self.cache.set(system, operation, params, result)
 
            # 7. 감사 로그 (응답)
            await self.audit_logger.log_response(
                audit_id=audit_id, outcome="success", result_summary=result,
            )
 
            return result
 
        except Exception as e:
            await self.audit_logger.log_response(
                audit_id=audit_id, outcome="failure", error=str(e),
            )
            raise

이벤트 드리븐 통합

실시간 연동이 필요한 경우 이벤트 드리븐(Event-Driven) 아키텍처를 활용합니다. Apache Kafka를 메시지 브로커로 사용하여 시스템 간 느슨한 결합을 유지하면서 실시간 데이터 흐름을 구현합니다.

Kafka 소비자 구현

kafka_consumer.py
python
from aiokafka import AIOKafkaConsumer
import json
 
class WorkflowEventConsumer:
    def __init__(self, workflow_engine: WorkflowEngine):
        self.engine = workflow_engine
        self.consumer = AIOKafkaConsumer(
            "erp.orders", "crm.cases", "itsm.incidents",
            bootstrap_servers="kafka:9092",
            group_id="agentic-workflow-consumer",
            value_deserializer=lambda m: json.loads(m.decode("utf-8")),
            enable_auto_commit=False,  # 수동 커밋으로 정확히 한 번 처리
        )
 
    async def start(self):
        await self.consumer.start()
        try:
            async for message in self.consumer:
                await self._process_message(message)
                await self.consumer.commit()
        finally:
            await self.consumer.stop()
 
    async def _process_message(self, message):
        topic = message.topic
        event = message.value
 
        match topic:
            case "erp.orders":
                if event["type"] == "order_created":
                    await self.engine.start_workflow(
                        workflow_type="order_processing",
                        input_data=event,
                    )
            case "crm.cases":
                if event["type"] == "case_created":
                    await self.engine.start_workflow(
                        workflow_type="customer_support",
                        input_data=event,
                    )
            case "itsm.incidents":
                if event["type"] == "incident_detected":
                    await self.engine.start_workflow(
                        workflow_type="incident_response",
                        input_data=event,
                    )

이벤트 발행

워크플로우의 결과를 다른 시스템에 이벤트로 전달합니다.

kafka_producer.py
python
from aiokafka import AIOKafkaProducer
import json
 
class WorkflowEventProducer:
    def __init__(self):
        self.producer = AIOKafkaProducer(
            bootstrap_servers="kafka:9092",
            value_serializer=lambda v: json.dumps(v).encode("utf-8"),
            acks="all",  # 모든 레플리카에 기록 확인
        )
 
    async def publish_workflow_result(
        self,
        workflow_id: str,
        result: dict,
    ) -> None:
        event = {
            "type": "workflow_completed",
            "workflow_id": workflow_id,
            "result": result,
            "timestamp": datetime.utcnow().isoformat(),
        }
 
        await self.producer.send_and_wait(
            topic="workflow.results",
            value=event,
            key=workflow_id.encode(),
        )

레거시 시스템 어댑터

현대적인 API가 없는 레거시 시스템과의 통합은 특별한 접근이 필요합니다.

어댑터 패턴

legacy_adapter.py
python
class LegacyScreenScrapingAdapter(EnterpriseAdapter):
    """화면 스크래핑 기반 레거시 시스템 어댑터"""
 
    async def execute(self, operation: str, params: dict) -> dict:
        session = await self._login()
 
        match operation:
            case "query_mainframe":
                # 터미널 에뮬레이션으로 메인프레임 조회
                await session.send_keys(params["command"])
                screen = await session.read_screen()
                return self._parse_screen(screen)
 
            case "submit_batch":
                # 파일 기반 배치 인터페이스
                input_file = self._generate_input_file(params)
                await self._upload_file(input_file)
                await self._trigger_batch_job()
                return {"status": "submitted", "job_id": self._last_job_id}
 
 
class LegacyDatabaseAdapter(EnterpriseAdapter):
    """직접 데이터베이스 연결 기반 레거시 어댑터"""
 
    async def execute(self, operation: str, params: dict) -> dict:
        # 읽기 전용 연결 사용
        async with self.read_pool.acquire() as conn:
            match operation:
                case "query":
                    rows = await conn.fetch(params["sql"], *params.get("args", []))
                    return {"rows": [dict(r) for r in rows]}
Tip

레거시 시스템 통합 시에는 **반부패 계층(Anti-Corruption Layer)**을 반드시 구현합니다. 레거시 시스템의 데이터 모델과 비즈니스 로직이 새로운 워크플로우 시스템에 "오염"되지 않도록, 어댑터 계층에서 데이터 변환과 개념 매핑을 수행합니다.

데이터 변환

시스템 간 데이터 형식이 다를 때 일관된 변환 계층이 필요합니다.

data_transformer.py
python
class DataTransformer:
    """시스템 간 데이터 변환"""
 
    def __init__(self):
        self.mappings: dict[tuple[str, str], Callable] = {}
 
    def register(self, from_system: str, to_system: str):
        def decorator(func):
            self.mappings[(from_system, to_system)] = func
            return func
        return decorator
 
    def transform(self, data: dict, from_system: str, to_system: str) -> dict:
        key = (from_system, to_system)
        if key not in self.mappings:
            raise MappingNotFound(f"{from_system} -> {to_system} 변환 정의 없음")
        return self.mappings[key](data)
 
transformer = DataTransformer()
 
@transformer.register("salesforce", "sap")
def sf_to_sap(data: dict) -> dict:
    """Salesforce 주문 -> SAP 구매 주문 변환"""
    return {
        "PURCHASEORDER": {
            "PO_NUMBER": data.get("OrderNumber"),
            "VENDOR": data.get("Account", {}).get("SAPVendorId"),
            "PO_DATE": data.get("EffectiveDate"),
            "ITEMS": [
                {
                    "MATERIAL": item.get("Product2", {}).get("SAPMaterialCode"),
                    "QUANTITY": item.get("Quantity"),
                    "UNIT": item.get("UnitOfMeasure", "EA"),
                    "NET_PRICE": item.get("UnitPrice"),
                }
                for item in data.get("OrderItems", [])
            ],
        }
    }

트랜잭션 경계

여러 시스템에 걸친 작업에서 데이터 일관성을 보장하는 트랜잭션 경계(Transaction Boundary) 설계가 중요합니다.

분산 트랜잭션 전략

엔터프라이즈 시스템 간의 분산 트랜잭션은 5장에서 다룬 Saga 패턴으로 관리합니다. 각 시스템 호출에 대한 보상 액션을 정의하고, 실패 시 역순으로 보상을 실행합니다.

cross_system_saga.py
python
# 크로스 시스템 Saga 예시: 주문 처리
cross_system_saga = SagaOrchestrator([
    SagaStep(
        name="create_crm_opportunity",
        action=lambda ctx: crm_adapter.execute("create_opportunity", ctx["order"]),
        compensation=lambda ctx, r: crm_adapter.execute("delete_opportunity", {"id": r["id"]}),
    ),
    SagaStep(
        name="create_erp_order",
        action=lambda ctx: erp_adapter.execute("create_purchase_order", ctx["order"]),
        compensation=lambda ctx, r: erp_adapter.execute("cancel_purchase_order", {"id": r["po_id"]}),
    ),
    SagaStep(
        name="reserve_inventory",
        action=lambda ctx: erp_adapter.execute("reserve_stock", ctx["items"]),
        compensation=lambda ctx, r: erp_adapter.execute("release_stock", {"reservation_id": r["id"]}),
    ),
    SagaStep(
        name="create_itsm_change_request",
        action=lambda ctx: itsm_adapter.execute("create_change", ctx["deployment"]),
        compensation=lambda ctx, r: itsm_adapter.execute("cancel_change", {"id": r["change_id"]}),
    ),
])

정리

이 장에서는 Agentic Workflow와 엔터프라이즈 시스템의 통합 패턴을 살펴보았습니다.

  • 엔터프라이즈 어댑터 계층으로 다양한 시스템에 대한 일관된 인터페이스를 제공합니다
  • MCP를 사용하면 명시적 스키마로 도구를 표준화할 수 있습니다
  • API 게이트웨이로 인증, 속도 제한, 캐싱, 감사 로깅을 중앙 관리합니다
  • Kafka 기반 이벤트 드리븐 아키텍처로 실시간 통합을 구현합니다
  • 레거시 시스템에는 반부패 계층을 포함한 전용 어댑터를 구축합니다
  • 크로스 시스템 트랜잭션은 Saga 패턴으로 일관성을 보장합니다

다음 장 예고

9장에서는 Agentic Workflow의 보안과 거버넌스를 다룹니다. 최소 권한 원칙, 도구별 권한 제어, 비밀 관리, 비용 제어, 위험 평가 등 안전한 에이전트 운영을 위한 프레임워크를 살펴보겠습니다.

이 글이 도움이 되셨나요?

관련 주제 더 보기

#workflow#ai#automation

관련 글

AI / ML

9장: 보안과 거버넌스

최소 권한 원칙, 도구별 권한 제어, 비밀 관리, 입출력 검증, 비용 제어, 에이전트 거버넌스 프레임워크, 위험 평가, 모니터링과 알림 전략을 다룹니다.

2026년 3월 17일·17분
AI / ML

7장: 감사 로깅과 컴플라이언스

에이전트 행동 추적, 불변 감사 로그 설계, 규제 요구사항 대응, 설명 가능성, 재현 가능성, OpenTelemetry 통합, 보존 정책을 다룹니다.

2026년 3월 13일·16분
AI / ML

10장: 실전 프로젝트 -- Agentic Workflow 시스템 구축

고객 지원 자동화 워크플로우를 LangGraph로 구현하고, HITL, 감사 로깅, 엔터프라이즈 통합, 보안까지 포함한 프로덕션 수준 시스템을 구축합니다.

2026년 3월 19일·22분
이전 글7장: 감사 로깅과 컴플라이언스
다음 글9장: 보안과 거버넌스

댓글

목차

약 16분 남음
  • 이 장에서 배울 내용
  • 통합의 도전 과제
    • 주요 도전 과제
  • ERP/CRM/ITSM 연동 패턴
    • 통합 어댑터 계층
  • MCP 기반 도구 통합
    • MCP 서버 구현
  • API 게이트웨이
    • 게이트웨이의 역할
  • 이벤트 드리븐 통합
    • Kafka 소비자 구현
    • 이벤트 발행
  • 레거시 시스템 어댑터
    • 어댑터 패턴
  • 데이터 변환
  • 트랜잭션 경계
    • 분산 트랜잭션 전략
  • 정리
  • 다음 장 예고