HTTP/2와 Protocol Buffers 기반의 gRPC를 활용한 고성능 마이크로서비스 통신을 학습합니다. 4가지 스트리밍 모드와 AI 추론 서비스 구현을 실습합니다.
gRPC가 REST 대비 높은 성능을 달성하는 핵심 요인은 HTTP/2 프로토콜과 Protocol Buffers 직렬화의 조합에 있습니다.
멀티플렉싱 — 단일 TCP 연결 위에 여러 요청/응답 스트림을 동시에 전송합니다. HTTP/1.1의 Head-of-Line Blocking 문제를 해결합니다.
헤더 압축 — HPACK 알고리즘으로 헤더를 압축하여 반복되는 메타데이터의 오버헤드를 줄입니다.
서버 푸시 — 클라이언트가 요청하기 전에 서버가 리소스를 선제적으로 전송할 수 있습니다.
바이너리 프레이밍 — 텍스트 기반 HTTP/1.1과 달리 바이너리 프레임으로 데이터를 전송하여 파싱 효율이 높습니다.
Protocol Buffers(Protobuf)는 Google이 개발한 언어 중립적 바이너리 직렬화 포맷입니다. JSON 대비 직렬화/역직렬화 속도가 약 5-10배 빠르고, 페이로드 크기가 60-80% 작습니다.
// Protobuf 메시지 정의
syntax = "proto3";
message User {
string id = 1;
string name = 2;
string email = 3;
int32 age = 4;
repeated string tags = 5;
}{
"id": "user-42",
"name": "Kreath",
"email": "kreath@example.com",
"age": 30,
"tags": ["developer", "architect"]
}| 비교 항목 | JSON | Protobuf |
|---|---|---|
| 직렬화 크기 | 120 bytes | 45 bytes |
| 직렬화 시간 | 150ns | 30ns |
| 역직렬화 시간 | 200ns | 25ns |
| 사람 가독성 | 높음 | 낮음 (바이너리) |
| 스키마 진화 | 비공식 | 필드 번호 기반 호환성 |
gRPC 개발의 시작점은 .proto 파일에 서비스와 메시지를 정의하는 것입니다.
syntax = "proto3";
package ai.inference.v1;
option go_package = "github.com/example/ai-service/gen/go/ai/inference/v1";
option java_package = "com.example.ai.inference.v1";
import "google/protobuf/timestamp.proto";
// AI 추론 서비스
service InferenceService {
// 단일 추론 요청 (Unary)
rpc Complete(CompleteRequest) returns (CompleteResponse);
// 스트리밍 추론 (Server Streaming)
rpc StreamComplete(CompleteRequest) returns (stream StreamChunk);
// 배치 임베딩 (Client Streaming)
rpc BatchEmbed(stream EmbedRequest) returns (BatchEmbedResponse);
// 대화형 추론 (Bidirectional Streaming)
rpc Chat(stream ChatMessage) returns (stream ChatMessage);
}
message CompleteRequest {
string model = 1;
repeated Message messages = 2;
float temperature = 3;
int32 max_tokens = 4;
// 선택적 필드
optional float top_p = 5;
optional int32 seed = 6;
}
message Message {
string role = 1;
string content = 2;
}
message CompleteResponse {
string id = 1;
string model = 2;
repeated Choice choices = 3;
Usage usage = 4;
google.protobuf.Timestamp created_at = 5;
}
message Choice {
int32 index = 1;
Message message = 2;
string finish_reason = 3;
}
message Usage {
int32 prompt_tokens = 1;
int32 completion_tokens = 2;
int32 total_tokens = 3;
}
message StreamChunk {
string id = 1;
string model = 2;
repeated StreamChoice choices = 3;
optional Usage usage = 4; // 마지막 청크에서만 포함
}
message StreamChoice {
int32 index = 1;
MessageDelta delta = 2;
optional string finish_reason = 3;
}
message MessageDelta {
optional string role = 1;
optional string content = 2;
}
message EmbedRequest {
string model = 1;
string text = 2;
}
message BatchEmbedResponse {
repeated Embedding embeddings = 1;
Usage usage = 2;
}
message Embedding {
int32 index = 1;
repeated float vector = 2;
}
message ChatMessage {
string role = 1;
string content = 2;
google.protobuf.Timestamp timestamp = 3;
}# protoc 컴파일러와 플러그인 설치
# Python
pip install grpcio-tools
# 코드 생성 (Python)
python -m grpc_tools.protoc \
-I./proto \
--python_out=./gen/python \
--grpc_python_out=./gen/python \
--pyi_out=./gen/python \
proto/ai_service.proto
# Go
protoc \
-I./proto \
--go_out=./gen/go \
--go-grpc_out=./gen/go \
proto/ai_service.proto
# TypeScript (ts-proto)
protoc \
-I./proto \
--plugin=protoc-gen-ts_proto=./node_modules/.bin/protoc-gen-ts_proto \
--ts_proto_out=./gen/ts \
--ts_proto_opt=outputServices=grpc-js \
proto/ai_service.protoBuf(buf.build)를 사용하면 protoc 대신 더 현대적인 Protobuf 도구 체인을 활용할 수 있습니다. buf generate 명령 하나로 여러 언어의 코드를 동시에 생성하고, buf lint로 스키마 품질을 검증할 수 있습니다.
gRPC는 HTTP/2의 양방향 스트리밍을 활용하여 4가지 통신 패턴을 제공합니다.
가장 기본적인 요청-응답 패턴입니다. 클라이언트가 하나의 요청을 보내고, 서버가 하나의 응답을 반환합니다.
import grpc
from gen.python import ai_service_pb2 as pb
from gen.python import ai_service_pb2_grpc as service
async def unary_complete(stub: service.InferenceServiceStub):
request = pb.CompleteRequest(
model="claude-4",
messages=[
pb.Message(role="user", content="API 설계의 핵심 원칙은?"),
],
temperature=0.7,
max_tokens=1024,
)
response = await stub.Complete(
request,
timeout=30.0, # 30초 데드라인
metadata=[("x-api-key", "sk-abc123")],
)
print(f"Model: {response.model}")
print(f"Response: {response.choices[0].message.content}")
print(f"Tokens: {response.usage.total_tokens}")LLM의 토큰 스트리밍에 가장 적합한 패턴입니다. 클라이언트가 하나의 요청을 보내면, 서버가 여러 개의 응답 청크를 순차적으로 전송합니다.
async def stream_complete(stub: service.InferenceServiceStub):
request = pb.CompleteRequest(
model="claude-4",
messages=[
pb.Message(role="user", content="gRPC의 장점을 설명해주세요"),
],
temperature=0.7,
max_tokens=2048,
)
full_response = []
async for chunk in stub.StreamComplete(request, timeout=60.0):
for choice in chunk.choices:
if choice.delta.content:
full_response.append(choice.delta.content)
print(choice.delta.content, end="", flush=True)
if choice.finish_reason:
print(f"\n완료: {choice.finish_reason}")
# 마지막 청크에 사용량 정보 포함
if chunk.HasField("usage"):
print(f"총 토큰: {chunk.usage.total_tokens}")배치 임베딩처럼 클라이언트가 여러 입력을 순차적으로 보내고, 서버가 모든 입력을 처리한 후 단일 응답을 반환하는 패턴입니다.
async def batch_embed(stub: service.InferenceServiceStub):
async def generate_requests():
texts = [
"API 설계 패턴",
"마이크로서비스 아키텍처",
"서버리스 컴퓨팅",
"이벤트 주도 설계",
]
for text in texts:
yield pb.EmbedRequest(
model="text-embedding-3-large",
text=text,
)
response = await stub.BatchEmbed(generate_requests(), timeout=30.0)
for embedding in response.embeddings:
print(f"Index {embedding.index}: "
f"dim={len(embedding.vector)}, "
f"first_3={embedding.vector[:3]}")
print(f"총 토큰: {response.usage.total_tokens}")클라이언트와 서버가 동시에 메시지를 주고받는 패턴으로, 실시간 대화형 AI 서비스에 적합합니다.
async def interactive_chat(stub: service.InferenceServiceStub):
request_queue = asyncio.Queue()
async def send_messages():
while True:
user_input = await asyncio.to_thread(input, "User: ")
if user_input.lower() == "quit":
break
message = pb.ChatMessage(
role="user",
content=user_input,
)
await request_queue.put(message)
await request_queue.put(None) # 종료 신호
async def request_iterator():
while True:
message = await request_queue.get()
if message is None:
break
yield message
# 양방향 스트리밍 시작
send_task = asyncio.create_task(send_messages())
async for response in stub.Chat(request_iterator()):
print(f"Assistant: {response.content}")
await send_taskgRPC 인터셉터(Interceptor)는 REST의 미들웨어에 해당하는 개념으로, 요청/응답 파이프라인에 횡단 관심사를 추가합니다.
import grpc
import time
import logging
logger = logging.getLogger(__name__)
class LoggingInterceptor(grpc.aio.UnaryUnaryClientInterceptor):
"""요청/응답 로깅 인터셉터"""
async def intercept_unary_unary(
self, continuation, client_call_details, request
):
method = client_call_details.method
start_time = time.monotonic()
logger.info(f"gRPC 요청: {method}")
try:
response = await continuation(client_call_details, request)
elapsed = (time.monotonic() - start_time) * 1000
logger.info(f"gRPC 응답: {method} ({elapsed:.1f}ms)")
return response
except grpc.aio.AioRpcError as e:
elapsed = (time.monotonic() - start_time) * 1000
logger.error(
f"gRPC 오류: {method} ({elapsed:.1f}ms) "
f"code={e.code()} message={e.details()}"
)
raise
class RetryInterceptor(grpc.aio.UnaryUnaryClientInterceptor):
"""재시도 인터셉터"""
RETRYABLE_CODES = {
grpc.StatusCode.UNAVAILABLE,
grpc.StatusCode.DEADLINE_EXCEEDED,
grpc.StatusCode.RESOURCE_EXHAUSTED,
}
def __init__(self, max_retries: int = 3, base_delay: float = 0.1):
self.max_retries = max_retries
self.base_delay = base_delay
async def intercept_unary_unary(
self, continuation, client_call_details, request
):
last_error = None
for attempt in range(self.max_retries + 1):
try:
return await continuation(client_call_details, request)
except grpc.aio.AioRpcError as e:
last_error = e
if e.code() not in self.RETRYABLE_CODES:
raise
delay = self.base_delay * (2 ** attempt)
logger.warning(
f"재시도 {attempt + 1}/{self.max_retries}: "
f"code={e.code()}, delay={delay:.1f}s"
)
await asyncio.sleep(delay)
raise last_errorclass AuthInterceptor(grpc.aio.ServerInterceptor):
"""인증 인터셉터"""
async def intercept_service(self, continuation, handler_call_details):
metadata = dict(handler_call_details.invocation_metadata)
api_key = metadata.get("x-api-key")
if not api_key or not await validate_api_key(api_key):
context = handler_call_details
await context.abort(
grpc.StatusCode.UNAUTHENTICATED,
"유효하지 않은 API 키입니다",
)
return await continuation(handler_call_details)gRPC의 데드라인(Deadline)은 요청의 최대 처리 시간을 지정하며, 서비스 체인 전체에 자동으로 전파됩니다.
# 클라이언트: 30초 데드라인 설정
response = await stub.Complete(
request,
timeout=30.0,
)
# 서버: 남은 데드라인 확인
async def Complete(self, request, context):
remaining = context.time_remaining()
if remaining < 5.0:
# 시간이 부족하면 빠른 응답 모드
return await fast_inference(request)
return await full_inference(request)AI 추론 서비스에서 데드라인 설정은 특히 중요합니다. LLM 추론은 수 초가 걸릴 수 있으므로, 너무 짧은 데드라인은 불필요한 타임아웃을 유발합니다. 모델 크기와 예상 토큰 수를 고려하여 적절한 데드라인을 설정하세요. 일반적으로 입력 토큰 수에 비례하여 동적으로 계산하는 것이 좋습니다.
동일한 AI 추론 서비스를 REST와 gRPC로 구현했을 때의 성능 비교입니다.
| 지표 | REST (JSON) | gRPC (Protobuf) | 개선율 |
|---|---|---|---|
| 평균 지연시간 | 250ms | 25ms | 10x |
| P99 지연시간 | 800ms | 80ms | 10x |
| 직렬화 크기 | 2.4KB | 0.8KB | 3x |
| CPU 사용량 | 100% (기준) | 60% | 40% 절감 |
| 메모리 사용량 | 100% (기준) | 70% | 30% 절감 |
| 동시 연결 수 | 1000 (연결 풀) | 10000+ (멀티플렉싱) | 10x |
위 벤치마크는 순수 프로토콜 오버헤드 비교입니다. 실제 AI 추론에서는 모델 추론 시간(500-5000ms)이 전체 지연시간의 대부분을 차지하므로, 프로토콜 차이가 체감 성능에 미치는 영향은 내부 마이크로서비스 통신에서 가장 큽니다.
이 장에서는 gRPC의 핵심 기술인 HTTP/2 멀티플렉싱과 Protocol Buffers 바이너리 직렬화를 살펴보았습니다. 4가지 스트리밍 모드 중 Server Streaming은 LLM 토큰 스트리밍에, Bidirectional Streaming은 실시간 대화형 AI에 특히 적합합니다.
gRPC는 REST 대비 지연시간 10배 개선, CPU 40% 절감, 메모리 30% 절감을 달성하여 내부 마이크로서비스 통신에 최적의 선택지입니다. 인터셉터를 통한 횡단 관심사 처리와 데드라인 자동 전파는 분산 시스템의 안정성을 크게 높여줍니다.
4장에서는 클라이언트 주도의 유연한 데이터 쿼리를 가능하게 하는 GraphQL을 다룹니다. 스키마 퍼스트 설계, 타입 시스템, N+1 문제 해결, 그리고 AI 서비스에서 모델 조회와 실행 이력 분석에 GraphQL을 적용하는 방법을 Apollo Server 실습과 함께 살펴봅니다.
이 글이 도움이 되셨나요?
관련 주제 더 보기
GraphQL의 스키마 퍼스트 설계, 타입 시스템, N+1 문제 해결, AI 서비스 데이터 모델링을 Apollo Server 실습과 함께 학습합니다.
Richardson 성숙도 모델부터 리소스 설계, HTTP 메서드, OpenAPI 3.1 스펙, AI 서비스 REST 엔드포인트 설계까지 RESTful API의 핵심 원칙을 실습합니다.
비동기 작업 패턴, 멀티모달 입력 처리, Function Calling 인터페이스, 배치 API, 구조화된 출력 등 AI 서비스 고유의 API 설계 패턴을 학습합니다.