diff --git a/akd/crates/akd_storage/src/akd_storage_config.rs b/akd/crates/akd_storage/src/akd_storage_config.rs index 1a3d666477..2ad19564d8 100644 --- a/akd/crates/akd_storage/src/akd_storage_config.rs +++ b/akd/crates/akd_storage/src/akd_storage_config.rs @@ -3,7 +3,7 @@ use std::time::Duration; use akd::{directory::ReadOnlyDirectory, storage::StorageManager, Directory}; use serde::Deserialize; use thiserror::Error; -use tracing::error; +use tracing::{error, instrument}; use crate::{ db_config::DbConfig, publish_queue_config::PublishQueueConfig, vrf_key_config::VrfKeyConfig, @@ -42,6 +42,7 @@ impl AkdStorageConfig { Ok(()) } + #[instrument(skip(self), level = "info")] pub async fn initialize_directory( &self, ) -> Result< diff --git a/akd/crates/akd_storage/src/ms_sql/mod.rs b/akd/crates/akd_storage/src/ms_sql/mod.rs index 353ae9101c..206958d062 100644 --- a/akd/crates/akd_storage/src/ms_sql/mod.rs +++ b/akd/crates/akd_storage/src/ms_sql/mod.rs @@ -81,7 +81,12 @@ impl MsSql { MsSqlBuilder::new(connection_string) } - #[instrument(skip(connection_string), fields(pool_size))] + #[instrument( + skip(connection_string), + fields(pool_size), + name = "ms_sql_new", + level = "info" + )] pub async fn new(connection_string: String, pool_size: u32) -> Result { info!(pool_size, "Creating MS SQL storage"); let connection_manager = MsSqlConnectionManager::new(connection_string); diff --git a/akd/crates/akd_storage/src/vrf_key_database.rs b/akd/crates/akd_storage/src/vrf_key_database.rs index 2930ecc641..fbe4a48d2e 100644 --- a/akd/crates/akd_storage/src/vrf_key_database.rs +++ b/akd/crates/akd_storage/src/vrf_key_database.rs @@ -6,7 +6,7 @@ use chacha20poly1305::{ }; use rsa::{pkcs1::DecodeRsaPrivateKey, Pkcs1v15Encrypt}; use thiserror::Error; -use tracing::{error, info}; +use tracing::{error, info, instrument}; use crate::{db_config::DatabaseType, vrf_key_config::VrfKeyConfig}; @@ -46,6 +46,7 @@ pub struct VrfKeyDatabase { } impl VrfKeyDatabase { + #[instrument(skip(db, config), level = "info", name = "vrf_key_database_new")] pub async fn new( db: DatabaseType, config: VrfKeyConfig, @@ -178,6 +179,7 @@ impl From for i16 { pub(crate) struct VrfKey(pub Vec); impl VrfKeyTableData { + #[instrument(skip_all, level = "info", name = "vrf_key_table_data_new")] pub async fn new(config: &VrfKeyConfig) -> Result<(Self, VrfKey), VrfKeyCreationError> { info!("Generating new VRF key and table data"); // handle constant key case separately to avoid unnecessary key generation / parsing diff --git a/akd/crates/publisher/src/lib.rs b/akd/crates/publisher/src/lib.rs index c4ab711523..24972a0f25 100644 --- a/akd/crates/publisher/src/lib.rs +++ b/akd/crates/publisher/src/lib.rs @@ -65,33 +65,7 @@ async fn start_publisher( let mut next_epoch = tokio::time::Instant::now() + std::time::Duration::from_millis(config.publisher.epoch_duration_ms as u64); loop { - trace!("Processing publish queue for epoch"); - - // Pull items from publish queue - let (ids, items) = publish_queue - .peek(config.publisher.epoch_update_limit) - .await? - .into_iter() - .fold((vec![], vec![]), |mut acc, i| { - acc.0.push(i.0); - acc.1.push(i.1); - acc - }); - - let result: Result<()> = { - // Apply items to directory - directory - .publish(items) - .await - .context("AKD publish failed")?; - - // Remove processed items from publish queue - publish_queue - .remove(ids) - .await - .context("Failed to remove processed publish queue items")?; - Ok(()) - }; + let result = check_publish_queue(&directory, &publish_queue, config).await; if let Err(e) = result { error!(%e, "Error processing publish queue items"); @@ -122,6 +96,46 @@ async fn start_publisher( Ok(()) } +#[instrument(skip_all, level = "info")] +async fn check_publish_queue( + directory: &BitAkdDirectory, + publish_queue: &PublishQueueType, + config: &ApplicationConfig, +) -> Result<()> { + trace!("Processing publish queue for epoch"); + + // Pull items from publish queue + let items = publish_queue + .peek(config.publisher.epoch_update_limit) + .await + .context("Failed to peek publish queue")?; + + if items.is_empty() { + info!("No items in publish queue to process"); + return Ok(()); + } + + trace!(num_items = items.len(), "Publishing items to AKD"); + let (ids, items) = items.into_iter().fold((vec![], vec![]), |mut acc, i| { + acc.0.push(i.0); + acc.1.push(i.1); + acc + }); + + // Apply items to directory + directory + .publish(items) + .await + .context("AKD publish failed")?; + + // Remove processed items from publish queue + publish_queue + .remove(ids) + .await + .context("Failed to remove processed publish queue items")?; + Ok(()) +} + #[instrument(skip_all)] async fn start_web( publish_queue: PublishQueueType,