diff --git a/akd/crates/akd_storage/src/akd_storage_config.rs b/akd/crates/akd_storage/src/akd_storage_config.rs index 939d047a09..1a3d666477 100644 --- a/akd/crates/akd_storage/src/akd_storage_config.rs +++ b/akd/crates/akd_storage/src/akd_storage_config.rs @@ -1,6 +1,6 @@ use std::time::Duration; -use akd::{storage::StorageManager, Directory}; +use akd::{directory::ReadOnlyDirectory, storage::StorageManager, Directory}; use serde::Deserialize; use thiserror::Error; use tracing::error; @@ -71,6 +71,35 @@ impl AkdStorageConfig { Ok((directory, db, publish_queue)) } + pub async fn initialize_readonly_directory( + &self, + ) -> Result< + ( + ReadOnlyDirectory, + AkdDatabase, + PublishQueueType, + ), + AkdStorageInitializationError, + > { + let (storage_manager, db) = self.initialize_storage().await?; + + let vrf_storage = db.vrf_key_database().await.map_err(|err| { + error!(%err, "Failed to initialize VRF key database"); + AkdStorageInitializationError + })?; + + let publish_queue = PublishQueueType::new(&self.publish_queue_config, &db); + + let directory = ReadOnlyDirectory::new(storage_manager, vrf_storage) + .await + .map_err(|err| { + error!(%err, "Failed to initialize ReadOnlyDirectory"); + AkdStorageInitializationError + })?; + + Ok((directory, db, publish_queue)) + } + async fn initialize_storage( &self, ) -> Result<(StorageManager, AkdDatabase), AkdStorageInitializationError> { diff --git a/akd/crates/akd_storage/src/ms_sql/mod.rs b/akd/crates/akd_storage/src/ms_sql/mod.rs index f4e83df74d..353ae9101c 100644 --- a/akd/crates/akd_storage/src/ms_sql/mod.rs +++ b/akd/crates/akd_storage/src/ms_sql/mod.rs @@ -30,14 +30,15 @@ use uuid::Uuid; use crate::{ ms_sql::tables::{ publish_queue::{ - bulk_delete_rows, bulk_delete_statement, enqueue_statement, peek_no_limit_statement, - peek_statement, + bulk_delete_rows, bulk_delete_statement, enqueue_statement, + label_pending_publish_statement, peek_no_limit_statement, peek_statement, }, vrf_key, }, publish_queue::{PublishQueue, PublishQueueError}, vrf_key_config::VrfKeyConfig, vrf_key_database::{VrfKeyRetrievalError, VrfKeyStorageError, VrfKeyTableData}, + ReadOnlyPublishQueue, }; const DEFAULT_POOL_SIZE: u32 = 100; @@ -908,3 +909,44 @@ impl PublishQueue for MsSql { } } } + +#[async_trait] +impl ReadOnlyPublishQueue for MsSql { + #[instrument(skip(self), level = "debug")] + async fn label_pending_publish(&self, label: &AkdLabel) -> Result { + debug!("Checking if label is pending publish"); + + let mut conn = self.get_connection().await.map_err(|_| { + error!("Failed to get DB connection for label_pending_publish"); + PublishQueueError + })?; + + let statement = label_pending_publish_statement(label); + trace!(sql = statement.sql(), "Query SQL"); + + let query_stream = conn + .query(statement.sql(), &statement.params()) + .await + .map_err(|e| { + error!(error = %e, "Failed to execute label pending publish query"); + PublishQueueError + })?; + + let row = query_stream.into_row().await.map_err(|e| { + error!(error = %e, "Failed to fetch row for label pending publish"); + PublishQueueError + })?; + + if let Some(row) = row { + let is_pending = statement.parse(&row).map_err(|e| { + error!(error = %e, "Failed to parse label pending publish result"); + PublishQueueError + })?; + debug!(is_pending, "Label pending publish check complete"); + Ok(is_pending) + } else { + debug!("Label not found in publish queue"); + Ok(false) + } + } +} diff --git a/akd/crates/akd_storage/src/ms_sql/tables/publish_queue.rs b/akd/crates/akd_storage/src/ms_sql/tables/publish_queue.rs index 8ec599bead..1df444273d 100644 --- a/akd/crates/akd_storage/src/ms_sql/tables/publish_queue.rs +++ b/akd/crates/akd_storage/src/ms_sql/tables/publish_queue.rs @@ -110,23 +110,28 @@ pub fn bulk_delete_statement(temp_table_name: &str) -> Statement { Statement::new(sql, SqlParams::new()) } -// pub fn delete_statement(ids: Vec) -> Statement { -// debug!("Building delete_statement for publish queue"); -// let mut params = SqlParams::new(); -// let mut id_placeholders = Vec::new(); +pub fn label_pending_publish_statement( + label: &AkdLabel, +) -> QueryStatement { + debug!("Building label_pending_publish_statement for publish queue"); + let mut params = SqlParams::new(); + params.add("raw_label", Box::new(label.0.clone())); -// for (i, id) in ids.iter().enumerate() { -// let param_name = format!("id_{}", i); -// params.add(¶m_name, Box::new(*id)); -// id_placeholders.push(params.key_for(¶m_name).expect("id was added to params")); -// } - -// let sql = format!( -// r#" -// DELETE FROM {} -// WHERE id IN ({})"#, -// TABLE_PUBLISH_QUEUE, -// id_placeholders.join(", ") -// ); -// Statement::new(sql, params) -// } + let sql = format!( + r#" + SELECT COUNT(1) AS label_count + FROM {} + WHERE raw_label = {}"#, + TABLE_PUBLISH_QUEUE, + params + .key_for("raw_label") + .expect("raw_label was added to the params list"), + ); + QueryStatement::new(sql, params, |row: &ms_database::Row| { + let count: i64 = row.get("label_count").ok_or_else(|| { + error!("label_count is NULL or missing in publish queue row"); + PublishQueueError + })?; + Ok(count > 0) + }) +} diff --git a/akd/crates/akd_storage/src/publish_queue.rs b/akd/crates/akd_storage/src/publish_queue.rs index 0837342581..89f2849a2a 100644 --- a/akd/crates/akd_storage/src/publish_queue.rs +++ b/akd/crates/akd_storage/src/publish_queue.rs @@ -26,6 +26,11 @@ pub trait PublishQueue { async fn remove(&self, ids: Vec) -> Result<(), PublishQueueError>; } +#[async_trait] +pub trait ReadOnlyPublishQueue { + async fn label_pending_publish(&self, label: &AkdLabel) -> Result; +} + #[derive(Debug, Clone)] pub enum PublishQueueType { MsSql(MsSql), @@ -74,3 +79,12 @@ impl PublishQueue for PublishQueueType { } } } + +#[async_trait] +impl ReadOnlyPublishQueue for PublishQueueType { + async fn label_pending_publish(&self, label: &AkdLabel) -> Result { + match self { + PublishQueueType::MsSql(ms_sql) => ms_sql.label_pending_publish(label).await, + } + } +}