diff --git a/akd/crates/akd_storage/src/akd_storage_config.rs b/akd/crates/akd_storage/src/akd_storage_config.rs index 84981de657..939d047a09 100644 --- a/akd/crates/akd_storage/src/akd_storage_config.rs +++ b/akd/crates/akd_storage/src/akd_storage_config.rs @@ -5,7 +5,10 @@ use serde::Deserialize; use thiserror::Error; use tracing::error; -use crate::{db_config::DbConfig, vrf_key_config::VrfKeyConfig, AkdDatabase, VrfKeyDatabase}; +use crate::{ + db_config::DbConfig, publish_queue_config::PublishQueueConfig, vrf_key_config::VrfKeyConfig, + AkdDatabase, PublishQueueType, VrfKeyDatabase, +}; #[derive(Debug, Clone, Deserialize)] pub struct AkdStorageConfig { @@ -20,6 +23,7 @@ pub struct AkdStorageConfig { #[serde(default = "default_cache_clean_ms")] pub cache_clean_ms: usize, pub vrf_key_config: VrfKeyConfig, + pub publish_queue_config: PublishQueueConfig, } #[derive(Debug, Error)] @@ -44,6 +48,7 @@ impl AkdStorageConfig { ( Directory, AkdDatabase, + PublishQueueType, ), AkdStorageInitializationError, > { @@ -54,6 +59,8 @@ impl AkdStorageConfig { AkdStorageInitializationError })?; + let publish_queue = PublishQueueType::new(&self.publish_queue_config, &db); + let directory = Directory::new(storage_manager, vrf_storage) .await .map_err(|err| { @@ -61,7 +68,7 @@ impl AkdStorageConfig { AkdStorageInitializationError })?; - Ok((directory, db)) + Ok((directory, db, publish_queue)) } async fn initialize_storage( diff --git a/akd/crates/akd_storage/src/lib.rs b/akd/crates/akd_storage/src/lib.rs index 46347fd9fb..54319f8852 100644 --- a/akd/crates/akd_storage/src/lib.rs +++ b/akd/crates/akd_storage/src/lib.rs @@ -3,6 +3,7 @@ pub mod akd_storage_config; pub mod db_config; pub mod ms_sql; mod publish_queue; +pub mod publish_queue_config; pub mod vrf_key_config; pub mod vrf_key_database; diff --git a/akd/crates/akd_storage/src/publish_queue.rs b/akd/crates/akd_storage/src/publish_queue.rs index 92974203e1..4b4fcd6fcf 100644 --- a/akd/crates/akd_storage/src/publish_queue.rs +++ b/akd/crates/akd_storage/src/publish_queue.rs @@ -1,7 +1,12 @@ use async_trait::async_trait; use thiserror::Error; -use crate::ms_sql::MsSql; +use crate::{ + db_config::DatabaseType, + ms_sql::MsSql, + publish_queue_config::{PublishQueueConfig, PublishQueueProvider}, + AkdDatabase, +}; pub(crate) struct PublishQueueItem { pub id: uuid::Uuid, @@ -29,6 +34,22 @@ pub enum PublishQueueType { MsSql(MsSql), } +impl PublishQueueType { + pub fn new(config: &PublishQueueConfig, db: &AkdDatabase) -> PublishQueueType { + match &config.provider { + PublishQueueProvider::DbBacked => db.into(), + } + } +} + +impl From<&AkdDatabase> for PublishQueueType { + fn from(db: &AkdDatabase) -> Self { + match db.db() { + DatabaseType::MsSql(ms_sql) => PublishQueueType::MsSql(ms_sql.clone()), + } + } +} + #[async_trait] impl PublishQueue for PublishQueueType { async fn enqueue( diff --git a/akd/crates/akd_storage/src/publish_queue_config.rs b/akd/crates/akd_storage/src/publish_queue_config.rs new file mode 100644 index 0000000000..e6499fb25e --- /dev/null +++ b/akd/crates/akd_storage/src/publish_queue_config.rs @@ -0,0 +1,21 @@ +use serde::Deserialize; + +#[derive(Debug, Clone, Deserialize)] +#[serde(tag = "type")] +pub struct PublishQueueConfig { + pub provider: PublishQueueProvider, + #[serde(default = "default_publish_limit")] + pub epoch_update_limit: Option, +} + +fn default_publish_limit() -> Option { + None +} + +#[derive(Debug, Clone, Deserialize)] +#[serde(tag = "type")] +pub enum PublishQueueProvider { + DbBacked, +} + +impl PublishQueueConfig {} diff --git a/akd/crates/akd_test_utility/src/main.rs b/akd/crates/akd_test_utility/src/main.rs index c1e6b8b496..99a86cf474 100644 --- a/akd/crates/akd_test_utility/src/main.rs +++ b/akd/crates/akd_test_utility/src/main.rs @@ -169,8 +169,12 @@ async fn main() -> Result<()> { vrf_key_config: akd_storage::vrf_key_config::VrfKeyConfig::B64EncodedSymmetricKey { key: "4AD95tg8tfveioyS/E2jAQw06FDTUCu+VSEZxa41wuM=".to_string(), }, + publish_queue_config: akd_storage::publish_queue_config::PublishQueueConfig { + provider: akd_storage::publish_queue_config::PublishQueueProvider::DbBacked, + epoch_update_limit: None, + }, }; - let (mut directory, db) = config + let (mut directory, db, _) = config .initialize_directory::() .await .context("Failed to initialize AKD directory")?; diff --git a/akd/crates/publisher/src/lib.rs b/akd/crates/publisher/src/lib.rs index 285747c2c6..2ad4d41ca5 100644 --- a/akd/crates/publisher/src/lib.rs +++ b/akd/crates/publisher/src/lib.rs @@ -15,7 +15,7 @@ pub struct AppHandles { #[instrument(skip_all, name = "publisher_start")] pub async fn start(config: ApplicationConfig, shutdown_rx: &Receiver<()>) -> Result { - let (directory, db) = config + let (directory, db, publish_queue) = config .storage .initialize_directory::() .await?; diff --git a/akd/crates/publisher/src/routes/mod.rs b/akd/crates/publisher/src/routes/mod.rs index ac63ac371c..2440c8271f 100644 --- a/akd/crates/publisher/src/routes/mod.rs +++ b/akd/crates/publisher/src/routes/mod.rs @@ -6,7 +6,7 @@ mod health; mod publish; #[derive(Clone)] -pub struct AppState { +pub(crate) struct AppState { pub directory: BitAkdDirectory, pub db: AkdDatabase, pub publish_queue: PublishQueueType,