mirror of
https://github.com/bitwarden/server
synced 2026-02-14 15:33:35 +00:00
Managed connection pool and migration framework for ms sql using tiberius as a backend
This commit is contained in:
8
akd/.gitignore
vendored
Normal file
8
akd/.gitignore
vendored
Normal file
@@ -0,0 +1,8 @@
|
||||
/target
|
||||
config.toml
|
||||
|
||||
/lib
|
||||
|
||||
docker/data
|
||||
|
||||
.structurizr
|
||||
1587
akd/Cargo.lock
generated
Normal file
1587
akd/Cargo.lock
generated
Normal file
File diff suppressed because it is too large
Load Diff
14
akd/Cargo.toml
Normal file
14
akd/Cargo.toml
Normal file
@@ -0,0 +1,14 @@
|
||||
[workspace]
|
||||
members = ["crates/*"]
|
||||
resolver = "2"
|
||||
|
||||
[workspace.package]
|
||||
edition = "2021"
|
||||
version = "0.1.0"
|
||||
authors = ["Bitwarden Inc"]
|
||||
license-file = "LICENSE"
|
||||
keywords = ["akd", "key transparency"]
|
||||
|
||||
[workspace.lints.clippy]
|
||||
unused_async = "deny"
|
||||
unwrap_used = "deny"
|
||||
16
akd/crates/macros/Cargo.toml
Normal file
16
akd/crates/macros/Cargo.toml
Normal file
@@ -0,0 +1,16 @@
|
||||
[package]
|
||||
name = "macros"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[lib]
|
||||
proc-macro = true
|
||||
|
||||
[dependencies]
|
||||
proc-macro2 = "1.0"
|
||||
quote = "1.0"
|
||||
syn = { version = "2.0", features = ["full", "extra-traits"] }
|
||||
|
||||
[dev-dependencies]
|
||||
ms_database = { path = "../ms_database" }
|
||||
trybuild = "1.0.111"
|
||||
@@ -0,0 +1 @@
|
||||
DROP TABLE users;
|
||||
@@ -0,0 +1 @@
|
||||
CREATE TABLE users (id INT PRIMARY KEY);
|
||||
356
akd/crates/macros/src/lib.rs
Normal file
356
akd/crates/macros/src/lib.rs
Normal file
@@ -0,0 +1,356 @@
|
||||
//! SQL Migration Macros
|
||||
//!
|
||||
//! This crate provides procedural macros for managing SQL migrations at compile time.
|
||||
//!
|
||||
//! # Overview
|
||||
//!
|
||||
//! The macros in this crate are inspired by Diesel's `embed_migrations!` macro and
|
||||
//! enable embedding SQL migration files directly into your Rust binary at compile time.
|
||||
//!
|
||||
//! It is up to the caller to ensure that a `Migration` struct is in scope and defines an accessible `new` constructor.
|
||||
//!
|
||||
//! # Migration Name Requirements
|
||||
//!
|
||||
//! Migration directory names must:
|
||||
//! - Be valid UTF-8
|
||||
//! - Not exceed 50 characters (to fit in a VARCHAR(50) database column)
|
||||
//!
|
||||
//! # Available Macros
|
||||
//!
|
||||
//! - [`migration!`] - Load a single migration from a directory
|
||||
//! - [`load_migrations!`] - Automatically load all migrations from a directory
|
||||
//!
|
||||
//! # Usage Example
|
||||
//!
|
||||
//! ```rust
|
||||
//! use ms_database::Migration;
|
||||
//! use macros::{migration, load_migrations};
|
||||
//!
|
||||
//! // Load a single migration
|
||||
//! let single_migration: Migration = migration!("./example_migrations/20250930_01_initial");
|
||||
//!
|
||||
//! // Load all migrations from a directory
|
||||
//! const ALL_MIGRATIONS: &[Migration] = load_migrations!("./example_migrations");
|
||||
//! ```
|
||||
|
||||
use proc_macro::TokenStream;
|
||||
use quote::quote;
|
||||
use std::fs;
|
||||
use std::path::Path;
|
||||
use syn::{parse_macro_input, LitStr};
|
||||
|
||||
/// Resolves a directory path by walking up to find the workspace root (first Cargo.toml).
|
||||
/// All paths are resolved relative to the workspace root.
|
||||
fn resolve_path(path_str: &str) -> std::path::PathBuf {
|
||||
// Try to get the current crate directory from CARGO_MANIFEST_DIR
|
||||
let start_dir = if let Ok(crate_dir) = std::env::var("CARGO_MANIFEST_DIR") {
|
||||
std::path::PathBuf::from(crate_dir)
|
||||
} else {
|
||||
// CARGO_MANIFEST_DIR not set (e.g., in trybuild tests)
|
||||
// Start from the current directory and walk up
|
||||
std::env::current_dir().expect("Could not determine current directory")
|
||||
};
|
||||
|
||||
let mut current = start_dir.as_path();
|
||||
|
||||
// Walk up to find the first Cargo.toml (workspace root)
|
||||
loop {
|
||||
if current.join("Cargo.toml").exists() {
|
||||
return current.join(path_str);
|
||||
}
|
||||
|
||||
// Move to parent directory
|
||||
match current.parent() {
|
||||
Some(parent) => current = parent,
|
||||
None => {
|
||||
// Reached filesystem root without finding Cargo.toml
|
||||
panic!(
|
||||
"Could not find Cargo.toml in any parent directory starting from {}",
|
||||
start_dir.display()
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper function to load a migration from a directory path.
|
||||
/// Returns a tuple of (migration_name, up_content, down_content_tokens).
|
||||
fn load_migration_from_path(full_path: &Path, relative_path: &str) -> (String, String, proc_macro2::TokenStream) {
|
||||
// Get the migration name from the directory name
|
||||
let migration_name = full_path
|
||||
.file_name()
|
||||
.expect("Invalid directory path")
|
||||
.to_str()
|
||||
.expect("Invalid UTF-8 in directory name")
|
||||
.to_string();
|
||||
|
||||
// Validate that the migration name fits in a varchar(50)
|
||||
if migration_name.len() > 50 {
|
||||
panic!(
|
||||
"Migration name '{}' exceeds 50 characters (length: {})",
|
||||
migration_name,
|
||||
migration_name.len()
|
||||
);
|
||||
}
|
||||
|
||||
// Read up.sql (required)
|
||||
let up_sql_path = full_path.join("up.sql");
|
||||
if !up_sql_path.exists() {
|
||||
panic!(
|
||||
"Required file 'up.sql' not found in migration directory: {}",
|
||||
full_path.display()
|
||||
);
|
||||
}
|
||||
let up_content = fs::read_to_string(&up_sql_path)
|
||||
.unwrap_or_else(|e| panic!("Failed to read up.sql in {}: {}", relative_path, e));
|
||||
|
||||
// Read down.sql (optional)
|
||||
let down_sql_path = full_path.join("down.sql");
|
||||
let down_content = if down_sql_path.exists() {
|
||||
let content = fs::read_to_string(&down_sql_path)
|
||||
.unwrap_or_else(|e| panic!("Failed to read down.sql in {}: {}", relative_path, e));
|
||||
quote! { Some(#content) }
|
||||
} else {
|
||||
quote! { None }
|
||||
};
|
||||
|
||||
(migration_name, up_content, down_content)
|
||||
}
|
||||
|
||||
/// Loads a single migration from a directory.
|
||||
///
|
||||
/// The directory must contain:
|
||||
/// - `up.sql` (required) - SQL to apply the migration
|
||||
/// - `down.sql` (optional) - SQL to roll back the migration
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `path` - A string literal containing the relative path to the migration directory
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// A `Migration` struct with:
|
||||
/// - `name`: The directory name (must fit in VARCHAR(50))
|
||||
/// - `up`: The contents of `up.sql`
|
||||
/// - `down`: `Some(contents)` if `down.sql` exists, `None` otherwise
|
||||
/// - `run_in_transaction`: Always `true`
|
||||
///
|
||||
/// # Usage
|
||||
///
|
||||
/// ```rust
|
||||
/// use ms_database::Migration;
|
||||
/// use macros::migration;
|
||||
///
|
||||
/// // This will load the migration at compile time
|
||||
/// let migration: Migration = migration!("./example_migrations/20250930_01_initial");
|
||||
/// assert_eq!(migration.name, "20250930_01_initial");
|
||||
/// assert!(migration.run_in_transaction);
|
||||
/// ```
|
||||
///
|
||||
/// # Generated Code
|
||||
///
|
||||
/// This macro generates code that creates a `Migration` struct:
|
||||
///
|
||||
/// ```text
|
||||
/// // Given a directory structure:
|
||||
/// // example_migrations/20250930_01_initial/
|
||||
/// // ├── up.sql (contains: "CREATE TABLE users (id INT PRIMARY KEY);")
|
||||
/// // └── down.sql (contains: "DROP TABLE users;")
|
||||
///
|
||||
/// // The macro call:
|
||||
/// migration!("./example_migrations/20250930_01_initial")
|
||||
///
|
||||
/// // Expands to:
|
||||
/// Migration {
|
||||
/// name: #migration_name,
|
||||
/// up: #up_content,
|
||||
/// down: #down_content,
|
||||
/// run_in_transaction: true,
|
||||
/// }
|
||||
/// ```
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// - If the migration name exceeds 50 characters
|
||||
/// - If `up.sql` is not found
|
||||
/// - If any file cannot be read
|
||||
#[proc_macro]
|
||||
pub fn migration(input: TokenStream) -> TokenStream {
|
||||
// Parse the input as a string literal (the directory path)
|
||||
let dir_path = parse_macro_input!(input as LitStr).value();
|
||||
|
||||
// Resolve the path (supports both absolute and relative paths)
|
||||
let full_path = resolve_path(&dir_path);
|
||||
|
||||
// Load the migration using the helper function
|
||||
let (migration_name, up_content, down_content) = load_migration_from_path(&full_path, &dir_path);
|
||||
|
||||
// Generate the Migration struct
|
||||
let expanded = quote! {
|
||||
Migration {
|
||||
name: #migration_name,
|
||||
up: #up_content,
|
||||
down: #down_content,
|
||||
run_in_transaction: true,
|
||||
}
|
||||
};
|
||||
|
||||
expanded.into()
|
||||
}
|
||||
|
||||
/// Automatically loads all migrations from a directory.
|
||||
///
|
||||
/// Scans the specified directory for subdirectories containing migration files.
|
||||
/// Each subdirectory must contain at least an `up.sql` file to be considered
|
||||
/// a valid migration. Directories are processed in alphabetical order to ensure
|
||||
/// consistent migration ordering.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `path` - A string literal containing the relative path to the migrations directory
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// A static reference to an array of `Migration` structs: `&[Migration]`
|
||||
///
|
||||
/// # Directory Structure
|
||||
///
|
||||
/// ```text
|
||||
/// migrations/
|
||||
/// ├── 20250930_01_initial/
|
||||
/// │ └── up.sql
|
||||
/// ├── 20250930_02_add_users/
|
||||
/// │ ├── up.sql
|
||||
/// │ └── down.sql
|
||||
/// └── 20250930_03_add_permissions/
|
||||
/// ├── up.sql
|
||||
/// └── down.sql
|
||||
/// ```
|
||||
///
|
||||
/// # Usage
|
||||
///
|
||||
/// ```rust
|
||||
/// use ms_database::Migration;
|
||||
/// use macros::load_migrations;
|
||||
///
|
||||
/// // This will load all migrations at compile time
|
||||
/// const MIGRATIONS: &[Migration] = load_migrations!("./example_migrations");
|
||||
///
|
||||
/// // Migrations are loaded in alphabetical order
|
||||
/// for migration in MIGRATIONS {
|
||||
/// println!("Migration: {}", migration.name);
|
||||
/// }
|
||||
/// ```
|
||||
///
|
||||
/// # Generated Code
|
||||
///
|
||||
/// This macro generates code that creates an array of `Migration` structs:
|
||||
///
|
||||
/// ```text
|
||||
/// // Given a directory structure:
|
||||
/// // migrations/
|
||||
/// // ├── 20250930_01_initial/
|
||||
/// // │ └── up.sql
|
||||
/// // ├── 20250930_02_add_users/
|
||||
/// // │ ├── up.sql
|
||||
/// // │ └── down.sql
|
||||
/// // └── 20250930_03_add_permissions/
|
||||
/// // └── up.sql
|
||||
///
|
||||
/// // The macro call:
|
||||
/// const MIGRATIONS: &[Migration] = load_migrations!("./migrations");
|
||||
///
|
||||
/// // Expands to:
|
||||
/// &[
|
||||
/// Migration {
|
||||
/// name: "20250930_01_initial",
|
||||
/// up: "...",
|
||||
/// down: None,
|
||||
/// run_in_transaction: true,
|
||||
/// },
|
||||
/// Migration {
|
||||
/// name: "20250930_02_add_users",
|
||||
/// up: "...",
|
||||
/// down: Some("..."),
|
||||
/// run_in_transaction: true,
|
||||
/// },
|
||||
/// Migration {
|
||||
/// name: "20250930_03_add_permissions",
|
||||
/// up: "...",
|
||||
/// down: None,
|
||||
/// run_in_transaction: true,
|
||||
/// },
|
||||
/// ]
|
||||
/// ```
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// - If the migrations directory does not exist
|
||||
/// - If the migrations directory cannot be read
|
||||
/// - If any migration name exceeds 50 characters
|
||||
/// - If any required `up.sql` file is missing
|
||||
#[proc_macro]
|
||||
pub fn load_migrations(input: TokenStream) -> TokenStream {
|
||||
// Parse the input as a string literal (the migrations directory path)
|
||||
let migrations_dir = parse_macro_input!(input as LitStr).value();
|
||||
|
||||
// Resolve the path (supports both absolute and relative paths)
|
||||
let migrations_path = resolve_path(&migrations_dir);
|
||||
|
||||
if !migrations_path.exists() || !migrations_path.is_dir() {
|
||||
panic!(
|
||||
"Migrations directory not found: {}",
|
||||
migrations_path.display()
|
||||
);
|
||||
}
|
||||
|
||||
// Read all directories in the migrations directory
|
||||
let mut migration_paths = Vec::new();
|
||||
for entry in fs::read_dir(&migrations_path)
|
||||
.unwrap_or_else(|e| panic!("Failed to read migrations directory: {}", e))
|
||||
{
|
||||
let entry = entry.unwrap();
|
||||
let path = entry.path();
|
||||
|
||||
if path.is_dir() {
|
||||
// Check if it has up.sql
|
||||
if path.join("up.sql").exists() {
|
||||
migration_paths.push(path);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Sort the migrations by name to ensure consistent ordering
|
||||
migration_paths.sort();
|
||||
|
||||
// Generate Migration structs inline for each directory
|
||||
let migrations: Vec<_> = migration_paths
|
||||
.iter()
|
||||
.map(|full_path| {
|
||||
// Use the directory name as the display path for error messages
|
||||
let display_path = full_path
|
||||
.file_name()
|
||||
.and_then(|n| n.to_str())
|
||||
.unwrap_or("unknown");
|
||||
|
||||
// Load the migration using the helper function
|
||||
let (migration_name, up_content, down_content) = load_migration_from_path(full_path, display_path);
|
||||
|
||||
quote! {
|
||||
Migration {
|
||||
name: #migration_name,
|
||||
up: #up_content,
|
||||
down: #down_content,
|
||||
run_in_transaction: true,
|
||||
}
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
// Generate the static array
|
||||
let expanded = quote! {
|
||||
&[#(#migrations),*]
|
||||
};
|
||||
|
||||
expanded.into()
|
||||
}
|
||||
@@ -0,0 +1,6 @@
|
||||
use macros::load_migrations;
|
||||
use ms_database::Migration;
|
||||
|
||||
const MIGRATIONS: &[Migration] = load_migrations!("tests/test_panic_migrations");
|
||||
|
||||
fn main() {}
|
||||
7
akd/crates/macros/tests/compile_panics/migration.rs
Normal file
7
akd/crates/macros/tests/compile_panics/migration.rs
Normal file
@@ -0,0 +1,7 @@
|
||||
use macros::migration;
|
||||
use ms_database::Migration;
|
||||
|
||||
const LONG_MIGRATION: Migration = migration!("tests/test_panic_migrations/20250930_03_really_long_name_that_exceeds_the_50_character_limit");
|
||||
const NO_UP_MIGRATION: Migration = migration!("tests/test_panic_migrations/20250930_04_no_up");
|
||||
|
||||
fn main() {}
|
||||
35
akd/crates/macros/tests/load_migrations.rs
Normal file
35
akd/crates/macros/tests/load_migrations.rs
Normal file
@@ -0,0 +1,35 @@
|
||||
use macros::load_migrations;
|
||||
use ms_database::Migration;
|
||||
|
||||
const TEST_MIGRIATONS: &[Migration] = load_migrations!("tests/test_migrations");
|
||||
const EXPECTED_SQL: &str = "SELECT 1;";
|
||||
|
||||
#[test]
|
||||
fn migration_ordering() {
|
||||
let names: Vec<&str> = TEST_MIGRIATONS.iter().map(|m| m.name).collect();
|
||||
assert_eq!(names, vec!["20250930_01_test", "20250930_02_up_down",]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn up_only_migration() {
|
||||
let migration = &TEST_MIGRIATONS[0];
|
||||
assert_eq!(migration.name, "20250930_01_test");
|
||||
assert_eq!(migration.up, EXPECTED_SQL);
|
||||
assert!(migration.down.is_none());
|
||||
assert!(migration.run_in_transaction);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn up_down_migration() {
|
||||
let migration = &TEST_MIGRIATONS[1];
|
||||
assert_eq!(migration.name, "20250930_02_up_down");
|
||||
assert_eq!(migration.up, EXPECTED_SQL);
|
||||
assert_eq!(migration.down, Some(EXPECTED_SQL));
|
||||
assert!(migration.run_in_transaction);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn long_name_migration() {
|
||||
let t = trybuild::TestCases::new();
|
||||
t.compile_fail("tests/compile_panics/load_migrations.rs");
|
||||
}
|
||||
28
akd/crates/macros/tests/migration.rs
Normal file
28
akd/crates/macros/tests/migration.rs
Normal file
@@ -0,0 +1,28 @@
|
||||
use macros::{migration};
|
||||
use ms_database::Migration;
|
||||
|
||||
const EXPECTED_SQL: &str = "SELECT 1;";
|
||||
|
||||
#[test]
|
||||
fn up_only_migration() {
|
||||
let migration = migration!("tests/test_migrations/20250930_01_test");
|
||||
assert_eq!(migration.name, "20250930_01_test");
|
||||
assert_eq!(migration.up, EXPECTED_SQL);
|
||||
assert!(migration.down.is_none());
|
||||
assert!(migration.run_in_transaction);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn up_down_migration() {
|
||||
let migration = migration!("tests/test_migrations/20250930_02_up_down");
|
||||
assert_eq!(migration.name, "20250930_02_up_down");
|
||||
assert_eq!(migration.up, EXPECTED_SQL);
|
||||
assert_eq!(migration.down, Some(EXPECTED_SQL));
|
||||
assert!(migration.run_in_transaction);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn long_name_migration() {
|
||||
let t = trybuild::TestCases::new();
|
||||
t.compile_fail("tests/compile_panics/migration.rs");
|
||||
}
|
||||
@@ -0,0 +1 @@
|
||||
SELECT 1;
|
||||
@@ -0,0 +1 @@
|
||||
SELECT 1;
|
||||
@@ -0,0 +1 @@
|
||||
SELECT 1;
|
||||
@@ -0,0 +1 @@
|
||||
SELECT 1;
|
||||
@@ -0,0 +1 @@
|
||||
SELECT 1;
|
||||
18
akd/crates/ms_database/Cargo.toml
Normal file
18
akd/crates/ms_database/Cargo.toml
Normal file
@@ -0,0 +1,18 @@
|
||||
[package]
|
||||
name = "ms_database"
|
||||
edition.workspace = true
|
||||
version.workspace = true
|
||||
authors.workspace = true
|
||||
license-file.workspace = true
|
||||
keywords.workspace = true
|
||||
|
||||
[dependencies]
|
||||
bb8 = "0.9.0"
|
||||
macros = { path = "../macros" }
|
||||
thiserror = "2.0.17"
|
||||
tiberius = { version = "0.12.3", features = ["chrono", "tokio"] }
|
||||
tokio = "1.47.1"
|
||||
tokio-util = {version = "0.7.16", features = ["compat"] }
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
105
akd/crates/ms_database/src/lib.rs
Normal file
105
akd/crates/ms_database/src/lib.rs
Normal file
@@ -0,0 +1,105 @@
|
||||
mod migrate;
|
||||
|
||||
use tokio::net::TcpStream;
|
||||
use tokio_util::compat::TokioAsyncWriteCompatExt;
|
||||
|
||||
pub use migrate::{Migration, MigrationError, run_pending_migrations};
|
||||
use bb8::ManageConnection;
|
||||
use tiberius::{AuthMethod, Client, Config};
|
||||
|
||||
pub struct ConnectionManager {
|
||||
connection_string: String,
|
||||
}
|
||||
|
||||
impl ConnectionManager {
|
||||
pub fn new(connection_string: String) -> Self {
|
||||
Self { connection_string }
|
||||
}
|
||||
|
||||
pub async fn connect(&self) -> Result<ManagedConnection, tiberius::error::Error> {
|
||||
let mut config = Config::new();
|
||||
|
||||
config.host("localhost");
|
||||
config.port(1433);
|
||||
config.authentication(AuthMethod::sql_server("SA", "<YourStrong@Passw0rd>"));
|
||||
config.trust_cert(); // on production, it is not a good idea to do this
|
||||
|
||||
let tcp = TcpStream::connect(config.get_addr()).await?;
|
||||
tcp.set_nodelay(true)?;
|
||||
|
||||
// To be able to use Tokio's tcp, we're using the `compat_write` from
|
||||
// the `TokioAsyncWriteCompatExt` to get a stream compatible with the
|
||||
// traits from the `futures` crate.
|
||||
let client = Client::connect(config, tcp.compat_write()).await?;
|
||||
|
||||
Ok(ManagedConnection(client))
|
||||
}
|
||||
}
|
||||
|
||||
type Stream = tokio_util::compat::Compat<TcpStream>;
|
||||
pub struct ManagedConnection(Client<Stream>);
|
||||
|
||||
// Transparently forward methods to the inner Client
|
||||
impl ManagedConnection {
|
||||
async fn execute(
|
||||
&mut self,
|
||||
sql: &str,
|
||||
params: &[&(dyn tiberius::ToSql)],
|
||||
) -> Result<tiberius::ExecuteResult, tiberius::error::Error> {
|
||||
self.0.execute(sql, params).await
|
||||
}
|
||||
|
||||
async fn query<'a>(
|
||||
&'a mut self,
|
||||
sql: &str,
|
||||
params: &[&(dyn tiberius::ToSql)],
|
||||
) -> Result<tiberius::QueryStream<'a>, tiberius::error::Error> {
|
||||
self.0.query(sql, params).await
|
||||
}
|
||||
|
||||
async fn simple_query<'a>(
|
||||
&'a mut self,
|
||||
sql: &str,
|
||||
) -> Result<tiberius::QueryStream<'a>, tiberius::error::Error> {
|
||||
self.0.simple_query(sql).await
|
||||
}
|
||||
|
||||
async fn bulk_insert<'a>(
|
||||
&'a mut self,
|
||||
table: &'a str,
|
||||
) -> Result<tiberius::BulkLoadRequest<'a, Stream>, tiberius::error::Error> {
|
||||
self.0.bulk_insert(&table).await
|
||||
}
|
||||
|
||||
async fn ping(&mut self) -> Result<u8, tiberius::error::Error> {
|
||||
let row = self.0.simple_query("SELECT 1").await?.into_first_result().await?;
|
||||
let value = row[0].get(0).expect("value is present");
|
||||
Ok(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl ManageConnection for ConnectionManager {
|
||||
type Connection = ManagedConnection;
|
||||
|
||||
type Error = tiberius::error::Error;
|
||||
|
||||
async fn connect(&self) -> Result<Self::Connection, Self::Error> {
|
||||
self.connect().await
|
||||
}
|
||||
|
||||
async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
|
||||
match conn.ping().await {
|
||||
Ok(v) if v == 1 => Ok(()),
|
||||
Ok(_) => Err(tiberius::error::Error::Protocol(
|
||||
"Unexpected ping response".into(),
|
||||
)),
|
||||
Err(e) => Err(e),
|
||||
}
|
||||
}
|
||||
|
||||
fn has_broken(&self, _conn: &mut Self::Connection) -> bool {
|
||||
// We don't have a good way to determine this sync. r2d2 (which bb8 is based on) recommends
|
||||
// always returning false here and relying on `is_valid` to catch broken connections.
|
||||
false
|
||||
}
|
||||
}
|
||||
130
akd/crates/ms_database/src/migrate.rs
Normal file
130
akd/crates/ms_database/src/migrate.rs
Normal file
@@ -0,0 +1,130 @@
|
||||
use macros::load_migrations;
|
||||
use tiberius::{error};
|
||||
|
||||
use crate::{ManagedConnection};
|
||||
|
||||
type Result<T> = std::result::Result<T, MigrationError>;
|
||||
const MIGRATIONS: &[Migration] = load_migrations!("./migrations");
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum MigrationError {
|
||||
#[error("Database error: {0}")]
|
||||
DatabaseError(#[from] error::Error),
|
||||
// Other error variants can be added here
|
||||
}
|
||||
|
||||
pub(crate) async fn pending_migrations(conn: &mut ManagedConnection) -> Result<Vec<Migration>> {
|
||||
// get applied migrations
|
||||
let applied = read_applied_migrations(conn).await?;
|
||||
|
||||
// create list of migrations that haven't been applied, in order.
|
||||
let pending = MIGRATIONS
|
||||
.iter()
|
||||
.filter(|m| !applied.contains(&m.name.to_string()))
|
||||
.cloned()
|
||||
.collect();
|
||||
|
||||
Ok(pending)
|
||||
}
|
||||
|
||||
const CREATE_MIGRATIONS_TABLE_SQL: &str = r#"
|
||||
IF OBJECT_ID('dbo.__migrations') IS NULL
|
||||
BEGIN
|
||||
CREATE TABLE dbo.__migrations (
|
||||
version VARCHAR(50) PRIMARY KEY,
|
||||
run_on DATETIME NOT NULL DEFAULT GETDATE()
|
||||
);
|
||||
END
|
||||
"#;
|
||||
|
||||
async fn ensure_migrations_table_exists(conn: &mut ManagedConnection) -> Result<()> {
|
||||
// create the migrations table if it doesn't exist
|
||||
conn.simple_query(CREATE_MIGRATIONS_TABLE_SQL).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
const READ_APPLIED_MIGRATIONS: &str = "SELECT version FROM dbo.__migrations ORDER BY version";
|
||||
|
||||
async fn read_applied_migrations(conn: &mut ManagedConnection) -> Result<Vec<String>> {
|
||||
let applied = conn.query(READ_APPLIED_MIGRATIONS, &[])
|
||||
.await?
|
||||
.into_first_result()
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|row| row.get::<&str,_>("version").expect("version column is present").to_owned())
|
||||
.collect();
|
||||
Ok(applied)
|
||||
}
|
||||
|
||||
const RECORD_MIGRATION_SQL: &str = "INSERT INTO dbo.__migrations (version) VALUES (@P1)";
|
||||
|
||||
async fn record_migration(conn: &mut ManagedConnection, migration: &Migration) -> Result<()> {
|
||||
conn.execute(RECORD_MIGRATION_SQL, &[&migration.name]).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn run_migration(migration: &Migration, conn: &mut ManagedConnection) -> Result<()> {
|
||||
if migration.run_in_transaction {
|
||||
conn.simple_query("BEGIN TRANSACTION").await?;
|
||||
|
||||
let result = async {
|
||||
conn.simple_query(&migration.up).await?;
|
||||
record_migration(conn, migration).await?;
|
||||
Ok::<_, MigrationError>(())
|
||||
}.await;
|
||||
|
||||
match result {
|
||||
Ok(_) => {
|
||||
conn.simple_query("COMMIT").await?;
|
||||
Ok(())
|
||||
}
|
||||
Err(e) => {
|
||||
conn.simple_query("ROLLBACK").await?;
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
conn.simple_query(&migration.up).await?;
|
||||
record_migration(conn, migration).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn run_pending_migrations(conn: &mut ManagedConnection) -> Result<()> {
|
||||
ensure_migrations_table_exists(conn).await?;
|
||||
let pending = pending_migrations(conn).await?;
|
||||
for migration in pending {
|
||||
run_migration(&migration, conn).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct Migration {
|
||||
/// The name of the migration, derived from the directory name
|
||||
pub name: &'static str,
|
||||
/// The SQL to execute when applying the migration
|
||||
pub up: &'static str,
|
||||
/// The SQL to execute when rolling back the migration (if provided)
|
||||
pub down: Option<&'static str>,
|
||||
/// Whether to run this migration in a transaction
|
||||
pub run_in_transaction: bool,
|
||||
}
|
||||
|
||||
impl Migration {
|
||||
/// Creates a new migration with the given properties.
|
||||
pub const fn new(
|
||||
name: &'static str,
|
||||
up: &'static str,
|
||||
down: Option<&'static str>,
|
||||
run_in_transaction: bool,
|
||||
) -> Self {
|
||||
Self {
|
||||
name,
|
||||
up,
|
||||
down,
|
||||
run_in_transaction,
|
||||
}
|
||||
}
|
||||
}
|
||||
8
akd/rust-toolchain.toml
Normal file
8
akd/rust-toolchain.toml
Normal file
@@ -0,0 +1,8 @@
|
||||
[toolchain]
|
||||
channel = "1.88.0"
|
||||
components = [ "rustfmt", "clippy" ]
|
||||
profile = "minimal"
|
||||
|
||||
# The following is not part of the rust-toolchain.toml format,
|
||||
# but is used by our Renovate config to manage nightly versions.
|
||||
nightly-channel = "nightly-2025-05-08"
|
||||
@@ -46,6 +46,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Events", "src\Events\Events
|
||||
EndProject
|
||||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Admin", "src\Admin\Admin.csproj", "{B131CEF3-89FB-4C90-ADB0-9E9C4246EB56}"
|
||||
EndProject
|
||||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "AkdStorage", "src\AkdStorage\AkdStorage.csproj", "{DBA98266-C460-4BFF-87C2-FD5266A283FE}"
|
||||
EndProject
|
||||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Notifications", "src\Notifications\Notifications.csproj", "{28635027-20E5-42FA-B218-B6C878DE5350}"
|
||||
EndProject
|
||||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Core.Test", "test\Core.Test\Core.Test.csproj", "{8EF31E6C-400A-4174-8BE3-502B08FB10B5}"
|
||||
@@ -186,6 +188,10 @@ Global
|
||||
{B131CEF3-89FB-4C90-ADB0-9E9C4246EB56}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||
{B131CEF3-89FB-4C90-ADB0-9E9C4246EB56}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||
{B131CEF3-89FB-4C90-ADB0-9E9C4246EB56}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||
{DBA98266-C460-4BFF-87C2-FD5266A283FE}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
|
||||
{DBA98266-C460-4BFF-87C2-FD5266A283FE}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||
{DBA98266-C460-4BFF-87C2-FD5266A283FE}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||
{DBA98266-C460-4BFF-87C2-FD5266A283FE}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||
{28635027-20E5-42FA-B218-B6C878DE5350}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
|
||||
{28635027-20E5-42FA-B218-B6C878DE5350}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||
{28635027-20E5-42FA-B218-B6C878DE5350}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||
@@ -369,6 +375,7 @@ Global
|
||||
{9CF59342-3912-4B45-A2BA-0F173666586D} = {DD5BD056-4AAE-43EF-BBD2-0B569B8DA84D}
|
||||
{994DD611-F266-4BD3-8072-3B1B57267ED5} = {DD5BD056-4AAE-43EF-BBD2-0B569B8DA84D}
|
||||
{B131CEF3-89FB-4C90-ADB0-9E9C4246EB56} = {DD5BD056-4AAE-43EF-BBD2-0B569B8DA84D}
|
||||
{DBA98266-C460-4BFF-87C2-FD5266A283FE} = {DD5BD056-4AAE-43EF-BBD2-0B569B8DA84D}
|
||||
{28635027-20E5-42FA-B218-B6C878DE5350} = {DD5BD056-4AAE-43EF-BBD2-0B569B8DA84D}
|
||||
{8EF31E6C-400A-4174-8BE3-502B08FB10B5} = {DD5BD056-4AAE-43EF-BBD2-0B569B8DA84F}
|
||||
{79BB453F-D0D8-4DDF-9809-A405C56692BD} = {DD5BD056-4AAE-43EF-BBD2-0B569B8DA84D}
|
||||
|
||||
Reference in New Issue
Block a user