From 2c971fc77e369ed11aa528dc174ab320f423a4e8 Mon Sep 17 00:00:00 2001 From: Matt Gibson Date: Thu, 16 Oct 2025 15:39:55 -0700 Subject: [PATCH] fixup sql connection pool --- akd/Cargo.toml | 5 ++++ akd/crates/ms_database/Cargo.toml | 12 +++++++-- akd/crates/ms_database/src/pool.rs | 39 ++++++++++++++++++++++++------ 3 files changed, 46 insertions(+), 10 deletions(-) diff --git a/akd/Cargo.toml b/akd/Cargo.toml index 88c598c52b..6cf9f45b7c 100644 --- a/akd/Cargo.toml +++ b/akd/Cargo.toml @@ -12,3 +12,8 @@ keywords = ["akd", "key transparency"] [workspace.lints.clippy] unused_async = "deny" unwrap_used = "deny" + +[workspace.dependencies] +tokio = { version = "1.47.1", features = ["full"] } +tracing = { version = "0.1.41" } +tracing-subscriber = {version = "0.3.19" } diff --git a/akd/crates/ms_database/Cargo.toml b/akd/crates/ms_database/Cargo.toml index c26a048c20..25ec3882e5 100644 --- a/akd/crates/ms_database/Cargo.toml +++ b/akd/crates/ms_database/Cargo.toml @@ -7,12 +7,20 @@ license-file.workspace = true keywords.workspace = true [dependencies] +async-trait = "0.1.89" bb8 = "0.9.0" macros = { path = "../macros" } thiserror = "2.0.17" -tiberius = { version = "0.12.3", features = ["chrono", "tokio"] } -tokio = "1.47.1" +tokio = {workspace = true} tokio-util = {version = "0.7.16", features = ["compat"] } +tracing = { workspace = true } + +[target.'cfg(target_os = "macos")'.dependencies] +tiberius = { version = "0.12.3", default-features = false, features = ["chrono", "tokio", "rustls"] } + +[target.'cfg(not(target_os = "macos"))'.workspace.dependencies] +tiberius = { version = "0.12.3", features = ["chrono", "tokio"] } + [lints] workspace = true diff --git a/akd/crates/ms_database/src/pool.rs b/akd/crates/ms_database/src/pool.rs index c8f899ce4b..bc81b2aa7c 100644 --- a/akd/crates/ms_database/src/pool.rs +++ b/akd/crates/ms_database/src/pool.rs @@ -5,6 +5,7 @@ use tokio_util::compat::TokioAsyncWriteCompatExt; use bb8::ManageConnection; use tiberius::{Client, Config}; +use tracing::{info, instrument, trace}; #[derive(thiserror::Error, Debug)] pub enum OnConnectError { @@ -23,19 +24,28 @@ pub struct ConnectionManager { impl ConnectionManager { pub fn new(connection_string: String) -> Self { - Self { connection_string, is_healthy: RwLock::new(true) } + Self { + connection_string, + is_healthy: RwLock::new(true), + } } + #[instrument(skip(self), level = "info")] pub async fn connect(&self) -> Result { - let config = Config::from_ado_string(&self.connection_string).map_err(OnConnectError::Config)?; + let config = + Config::from_ado_string(&self.connection_string).map_err(OnConnectError::Config)?; + info!(config = ?config, "Connecting"); let tcp = TcpStream::connect(config.get_addr()).await?; tcp.set_nodelay(true)?; // To be able to use Tokio's tcp, we're using the `compat_write` from // the `TokioAsyncWriteCompatExt` to get a stream compatible with the // traits from the `futures` crate. - let client = Client::connect(config, tcp.compat_write()).await.map_err(OnConnectError::OnConnect)?; + let client = Client::connect(config, tcp.compat_write()) + .await + .map_err(OnConnectError::OnConnect)?; + info!("Successfully connected"); Ok(ManagedConnection(client)) } @@ -57,6 +67,7 @@ impl ManagedConnection { sql: &str, params: &[&(dyn tiberius::ToSql)], ) -> Result { + trace!(%sql, "Executing SQL"); self.0.execute(sql, params).await } @@ -65,6 +76,7 @@ impl ManagedConnection { sql: &str, params: &[&(dyn tiberius::ToSql)], ) -> Result, tiberius::error::Error> { + trace!(%sql, "Querying SQL"); self.0.query(sql, params).await } @@ -72,18 +84,26 @@ impl ManagedConnection { &'a mut self, sql: &str, ) -> Result, tiberius::error::Error> { + trace!(%sql, "Simple querying SQL"); self.0.simple_query(sql).await } - + pub async fn bulk_insert<'a>( &'a mut self, table: &'a str, ) -> Result, tiberius::error::Error> { + trace!(%table, "Starting bulk insert"); self.0.bulk_insert(&table).await } - async fn ping(&mut self) -> Result { - let row = self.0.simple_query("SELECT 1").await?.into_first_result().await?; + async fn ping(&mut self) -> Result { + let row = self + .0 + .simple_query("SELECT 1") + .await? + .into_first_result() + .await?; + info!(?row, "Ping response"); let value = row[0].get(0).expect("value is present"); Ok(value) } @@ -96,7 +116,7 @@ pub enum PoolError { #[error("On Connect error: {0}")] OnConnect(#[source] OnConnectError), #[error("Unexpected ping response: {0}")] - Ping(u8) + Ping(i32), } impl ManageConnection for ConnectionManager { @@ -117,6 +137,9 @@ impl ManageConnection for ConnectionManager { } fn has_broken(&self, _conn: &mut Self::Connection) -> bool { - self.is_healthy.read().expect("poisoned is_healthy lock").clone() + self.is_healthy + .read() + .expect("poisoned is_healthy lock") + .clone() } }