SQLx의 컴파일 타임 쿼리 검증, 연결 풀, CRUD 구현, 마이그레이션, 트랜잭션까지 Rust 백엔드의 데이터베이스 연동 패턴을 다룹니다.
query vs query_as의 차이를 파악합니다SQLx는 Rust의 비동기 SQL 라이브러리입니다. ORM이 아닌 SQL 쿼리를 직접 작성하면서도 컴파일 타임에 쿼리를 검증하는 독특한 기능을 제공합니다.
| 특징 | 설명 |
|---|---|
| 컴파일 타임 검증 | SQL 구문, 타입 매핑을 빌드 시 확인 |
| 비동기 네이티브 | Tokio와 완벽 호환 |
| 순수 Rust | C 바인딩 없음 |
| 다중 DB 지원 | PostgreSQL, MySQL, SQLite |
| 마이그레이션 내장 | CLI 도구 제공 |
[dependencies]
sqlx = { version = "0.8", features = [
"runtime-tokio", # Tokio 런타임 사용
"postgres", # PostgreSQL 드라이버
"chrono", # 날짜/시간 타입 지원
"uuid", # UUID 타입 지원
] }데이터베이스 연결은 비용이 큰 작업이므로 **연결 풀(Connection Pool)**을 사용합니다.
use sqlx::postgres::PgPoolOptions;
#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
let database_url = std::env::var("DATABASE_URL")
.expect("DATABASE_URL 환경 변수가 설정되지 않았습니다");
let pool = PgPoolOptions::new()
.max_connections(20) // 최대 연결 수
.min_connections(5) // 최소 유지 연결 수
.acquire_timeout(std::time::Duration::from_secs(3)) // 연결 획득 타임아웃
.idle_timeout(std::time::Duration::from_secs(600)) // 유휴 연결 타임아웃
.connect(&database_url)
.await?;
// 연결 확인
sqlx::query("SELECT 1")
.execute(&pool)
.await?;
println!("데이터베이스 연결 성공");
Ok(())
}use std::sync::Arc;
use sqlx::PgPool;
struct AppState {
db: PgPool,
}
#[tokio::main]
async fn main() {
let pool = PgPoolOptions::new()
.max_connections(20)
.connect(&std::env::var("DATABASE_URL").unwrap())
.await
.unwrap();
let state = Arc::new(AppState { db: pool });
let app = Router::new()
.route("/users", get(list_users).post(create_user))
.with_state(state);
// 서버 시작...
}PgPool은 내부적으로 Arc를 사용하므로 Clone이 가능하고 비용이 저렴합니다. 그러나 여러 필드를 가진 AppState를 공유하기 위해 Arc로 감싸는 것이 일반적인 패턴입니다.
SQLx CLI를 사용하여 데이터베이스 스키마를 관리합니다.
# CLI 설치
cargo install sqlx-cli --no-default-features --features postgres
# 마이그레이션 디렉토리 생성
sqlx migrate add create_users_tableCREATE TABLE IF NOT EXISTS users (
id BIGSERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL,
email VARCHAR(255) NOT NULL UNIQUE,
password VARCHAR(255) NOT NULL,
active BOOLEAN NOT NULL DEFAULT TRUE,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_users_email ON users(email);# 마이그레이션 적용
sqlx migrate run
# 마이그레이션 되돌리기
sqlx migrate revert코드에서 애플리케이션 시작 시 자동으로 마이그레이션을 실행할 수도 있습니다.
sqlx::migrate!("./migrations")
.run(&pool)
.await
.expect("마이그레이션 실패");query vs query_as// 행을 Row 타입으로 받아 인덱스/이름으로 접근
let row = sqlx::query("SELECT id, name, email FROM users WHERE id = $1")
.bind(user_id)
.fetch_one(&pool)
.await?;
let id: i64 = row.get("id");
let name: String = row.get("name");
let email: String = row.get("email");#[derive(Debug, sqlx::FromRow, Serialize)]
struct User {
id: i64,
name: String,
email: String,
active: bool,
created_at: chrono::DateTime<chrono::Utc>,
updated_at: chrono::DateTime<chrono::Utc>,
}
// 쿼리 결과를 직접 구조체로 매핑
let user = sqlx::query_as::<_, User>(
"SELECT id, name, email, active, created_at, updated_at FROM users WHERE id = $1"
)
.bind(user_id)
.fetch_one(&pool)
.await?;
println!("사용자: {} ({})", user.name, user.email);sqlx::query! 매크로를 사용하면 컴파일 타임에 SQL 구문과 타입을 검증합니다. 이를 위해서는 빌드 시점에 데이터베이스에 접근할 수 있어야 합니다.
// DATABASE_URL 환경 변수가 설정되어 있어야 함
let user = sqlx::query_as!(
User,
"SELECT id, name, email, active, created_at, updated_at FROM users WHERE id = $1",
user_id
)
.fetch_one(&pool)
.await?;sqlx::query! 매크로를 사용할 때는 빌드 시점에 데이터베이스 연결이 필요합니다. CI 환경에서는 sqlx prepare 명령으로 쿼리 메타데이터를 미리 캐싱하여 오프라인 빌드를 지원할 수 있습니다.
# 쿼리 메타데이터 캐싱 (로컬에서 DB 접근 가능할 때)
cargo sqlx prepare
# .sqlx/ 디렉토리가 생성됨 — git에 커밋
# CI에서는 SQLX_OFFLINE=true로 빌드async fn create_user(
State(state): State<Arc<AppState>>,
Json(req): Json<CreateUserRequest>,
) -> Result<(StatusCode, Json<User>), AppError> {
let hashed_password = hash_password(&req.password).await?;
let user = sqlx::query_as::<_, User>(
r#"
INSERT INTO users (name, email, password)
VALUES ($1, $2, $3)
RETURNING id, name, email, active, created_at, updated_at
"#,
)
.bind(&req.name)
.bind(&req.email)
.bind(&hashed_password)
.fetch_one(&state.db)
.await
.map_err(|e| match e {
sqlx::Error::Database(ref db_err) if db_err.is_unique_violation() => {
AppError::Conflict("이미 존재하는 이메일입니다".to_string())
}
_ => AppError::Database(e),
})?;
Ok((StatusCode::CREATED, Json(user)))
}// 단일 조회
async fn get_user(
State(state): State<Arc<AppState>>,
Path(id): Path<i64>,
) -> Result<Json<User>, AppError> {
let user = sqlx::query_as::<_, User>(
"SELECT id, name, email, active, created_at, updated_at FROM users WHERE id = $1",
)
.bind(id)
.fetch_optional(&state.db)
.await?
.ok_or(AppError::NotFound(format!("사용자 id={}", id)))?;
Ok(Json(user))
}
// 목록 조회 (페이지네이션)
#[derive(Deserialize)]
struct PaginationParams {
page: Option<i64>,
per_page: Option<i64>,
}
async fn list_users(
State(state): State<Arc<AppState>>,
Query(params): Query<PaginationParams>,
) -> Result<Json<Vec<User>>, AppError> {
let page = params.page.unwrap_or(1).max(1);
let per_page = params.per_page.unwrap_or(20).min(100);
let offset = (page - 1) * per_page;
let users = sqlx::query_as::<_, User>(
r#"
SELECT id, name, email, active, created_at, updated_at
FROM users
ORDER BY created_at DESC
LIMIT $1 OFFSET $2
"#,
)
.bind(per_page)
.bind(offset)
.fetch_all(&state.db)
.await?;
Ok(Json(users))
}async fn update_user(
State(state): State<Arc<AppState>>,
Path(id): Path<i64>,
Json(req): Json<UpdateUserRequest>,
) -> Result<Json<User>, AppError> {
let user = sqlx::query_as::<_, User>(
r#"
UPDATE users
SET name = COALESCE($1, name),
email = COALESCE($2, email),
updated_at = NOW()
WHERE id = $3
RETURNING id, name, email, active, created_at, updated_at
"#,
)
.bind(&req.name)
.bind(&req.email)
.bind(id)
.fetch_optional(&state.db)
.await?
.ok_or(AppError::NotFound(format!("사용자 id={}", id)))?;
Ok(Json(user))
}async fn delete_user(
State(state): State<Arc<AppState>>,
Path(id): Path<i64>,
) -> Result<StatusCode, AppError> {
let result = sqlx::query("DELETE FROM users WHERE id = $1")
.bind(id)
.execute(&state.db)
.await?;
if result.rows_affected() == 0 {
Err(AppError::NotFound(format!("사용자 id={}", id)))
} else {
Ok(StatusCode::NO_CONTENT)
}
}여러 쿼리를 원자적으로 실행해야 할 때 **트랜잭션(Transaction)**을 사용합니다.
async fn transfer_points(
State(state): State<Arc<AppState>>,
Json(req): Json<TransferRequest>,
) -> Result<StatusCode, AppError> {
// 트랜잭션 시작
let mut tx = state.db.begin().await?;
// 발신자 포인트 차감
let sender = sqlx::query_as::<_, UserPoints>(
"UPDATE users SET points = points - $1 WHERE id = $2 AND points >= $1 RETURNING id, points",
)
.bind(req.amount)
.bind(req.from_user_id)
.fetch_optional(&mut *tx)
.await?
.ok_or(AppError::BadRequest("포인트가 부족합니다".to_string()))?;
// 수신자 포인트 추가
sqlx::query("UPDATE users SET points = points + $1 WHERE id = $2")
.bind(req.amount)
.bind(req.to_user_id)
.execute(&mut *tx)
.await?;
// 이체 기록 저장
sqlx::query(
"INSERT INTO transfers (from_user_id, to_user_id, amount) VALUES ($1, $2, $3)",
)
.bind(req.from_user_id)
.bind(req.to_user_id)
.bind(req.amount)
.execute(&mut *tx)
.await?;
// 커밋 — 에러 발생 시 자동 롤백
tx.commit().await?;
Ok(StatusCode::OK)
}SQLx의 트랜잭션은 Drop 시 자동으로 롤백됩니다. 즉, tx.commit()을 호출하지 않고 함수가 에러로 종료되면 모든 변경이 취소됩니다. 이는 Rust의 소유권 시스템과 Drop 트레이트가 만들어내는 안전한 패턴입니다.
PostgreSQL과 Rust 타입 간의 주요 매핑은 다음과 같습니다.
| PostgreSQL | Rust | 필요 feature |
|---|---|---|
| BIGINT / BIGSERIAL | i64 | 기본 |
| INTEGER | i32 | 기본 |
| VARCHAR / TEXT | String | 기본 |
| BOOLEAN | bool | 기본 |
| TIMESTAMPTZ | chrono::DateTime<Utc> | chrono |
| UUID | uuid::Uuid | uuid |
| JSONB | serde_json::Value | json |
| DECIMAL | rust_decimal::Decimal | decimal |
이 장에서는 SQLx를 활용한 데이터베이스 연동을 다루었습니다.
다음 장에서는 테스트와 품질 보증을 다룹니다. 단위 테스트, 통합 테스트, API 테스트, 프로퍼티 기반 테스트까지 Rust의 강력한 테스트 생태계를 살펴봅니다.
이 글이 도움이 되셨나요?
관련 주제 더 보기
Rust의 단위 테스트, 통합 테스트, API 테스트, testcontainers를 활용한 DB 테스트, 프로퍼티 기반 테스트, 벤치마킹까지 다룹니다.
중첩 라우터, 인증/인가 미들웨어, 요청 검증, 웹소켓, SSE, API 테스트까지 프로덕션 수준의 Axum 패턴을 다룹니다.
Clap v4 서브커맨드 아키텍처, config-rs 설정 관리, tracing 구조화 로깅, indicatif 진행 표시, 크로스 플랫폼 빌드와 배포까지 다룹니다.