From a03f863cc671124d84f6269a24c57bc3b69b10d6 Mon Sep 17 00:00:00 2001 From: Matt Gibson Date: Wed, 21 Jan 2026 09:50:08 -0800 Subject: [PATCH] update to latest akd prerelease --- akd/Cargo.lock | 9 +-- akd/Cargo.toml | 5 +- akd/crates/akd_storage/Cargo.toml | 2 +- .../akd_storage/src/akd_storage_config.rs | 55 ++++++++++++++++--- akd/crates/akd_test_utility/src/main.rs | 2 + akd/crates/common/Cargo.toml | 2 +- akd/crates/publisher/src/config.rs | 10 ++++ akd/crates/reader/src/error.rs | 3 + akd/crates/reader/src/routes/key_history.rs | 3 - 9 files changed, 73 insertions(+), 18 deletions(-) diff --git a/akd/Cargo.lock b/akd/Cargo.lock index d1377915b6..3df4902844 100644 --- a/akd/Cargo.lock +++ b/akd/Cargo.lock @@ -51,9 +51,9 @@ dependencies = [ [[package]] name = "akd" -version = "0.11.0" +version = "0.12.0-pre.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "636d3630d66710dde433d97ccc9c868f8858ab31bdd8b0401f9bea59b02f53b6" +checksum = "6adb55bd1a2b7e662efe48791800d69c30ae4edc6ea8b29f69626c32cc24bd5c" dependencies = [ "akd_core", "async-recursion", @@ -64,13 +64,14 @@ dependencies = [ "protobuf", "serde", "tokio", + "tracing", ] [[package]] name = "akd_core" -version = "0.11.0" +version = "0.12.0-pre.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "713d42c0592799667f6712d1a61fbc0274b0d51c60ecb0407d343f01bc7279f6" +checksum = "5cb7880a7b8f195f8a253a4e6b26ff4b19cc2031e171170f02a7ddd6eb95c1de" dependencies = [ "async-trait", "blake3", diff --git a/akd/Cargo.toml b/akd/Cargo.toml index 8a6defdd84..69fc86ea15 100644 --- a/akd/Cargo.toml +++ b/akd/Cargo.toml @@ -15,7 +15,10 @@ unwrap_used = "deny" [workspace.dependencies] anyhow = "1.0.100" -akd = "0.11.0" +akd = { version = "0.12.0-pre.11", features = [ + "tracing", + "tracing_instrument", +] } async-trait = "0.1.89" akd_storage = { path = "crates/akd_storage" } axum = { version = "0.8.8", features = ["macros"] } diff --git a/akd/crates/akd_storage/Cargo.toml b/akd/crates/akd_storage/Cargo.toml index 8bf34d4a4c..ca26defb4c 100644 --- a/akd/crates/akd_storage/Cargo.toml +++ b/akd/crates/akd_storage/Cargo.toml @@ -7,7 +7,7 @@ license-file.workspace = true keywords.workspace = true [dependencies] -akd = "0.11.0" +akd.workspace = true async-trait.workspace = true base64 = "0.22.1" bitwarden-encoding = { path = "../bitwarden-encoding" } diff --git a/akd/crates/akd_storage/src/akd_storage_config.rs b/akd/crates/akd_storage/src/akd_storage_config.rs index e00ea1e22a..5052b00a24 100644 --- a/akd/crates/akd_storage/src/akd_storage_config.rs +++ b/akd/crates/akd_storage/src/akd_storage_config.rs @@ -1,6 +1,8 @@ use std::time::Duration; -use akd::{directory::ReadOnlyDirectory, storage::StorageManager, Directory}; +use akd::{ + directory::ReadOnlyDirectory, storage::StorageManager, AzksParallelismConfig, Directory, +}; use serde::Deserialize; use thiserror::Error; use tracing::{error, instrument}; @@ -24,6 +26,13 @@ pub struct AkdStorageConfig { pub cache_clean_ms: usize, pub vrf_key_config: VrfKeyConfig, pub publish_queue_config: PublishQueueConfig, + + /// Parallelization for node insertion when available parallelism cannot be determined. Defaults to 32 + #[serde(default = "default_insertion_parallelism")] + pub insertion_parallelism: u32, + /// Parallelization for preloading data when available parallelism cannot be determined. Defaults to 32 + #[serde(default = "default_preload_parallelism")] + pub preload_parallelism: u32, } #[derive(Debug, Error)] @@ -62,7 +71,7 @@ impl AkdStorageConfig { let publish_queue = PublishQueueType::new(&self.publish_queue_config, &db); - let directory = Directory::new(storage_manager, vrf_storage) + let directory = Directory::new(storage_manager, vrf_storage, self.parallelism_config()) .await .map_err(|err| { error!(%err, "Failed to initialize Directory"); @@ -91,12 +100,13 @@ impl AkdStorageConfig { let publish_queue = ReadOnlyPublishQueueType::new(&self.publish_queue_config, &db); - let directory = ReadOnlyDirectory::new(storage_manager, vrf_storage) - .await - .map_err(|err| { - error!(%err, "Failed to initialize ReadOnlyDirectory"); - AkdStorageInitializationError - })?; + let directory = + ReadOnlyDirectory::new(storage_manager, vrf_storage, self.parallelism_config()) + .await + .map_err(|err| { + error!(%err, "Failed to initialize ReadOnlyDirectory"); + AkdStorageInitializationError + })?; Ok((directory, db, publish_queue)) } @@ -134,6 +144,27 @@ impl AkdStorageConfig { db, )) } + + #[allow(dead_code)] + async fn initialize_no_cache_storage( + &self, + ) -> Result<(StorageManager, AkdDatabase), AkdStorageInitializationError> { + let db = self.db_config.connect().await.map_err(|err| { + error!(%err, "Failed to connect to database"); + AkdStorageInitializationError + })?; + + let db = AkdDatabase::new(db, self.vrf_key_config.clone()); + + Ok((StorageManager::new_no_cache(db.clone()), db)) + } + + fn parallelism_config(&self) -> AzksParallelismConfig { + AzksParallelismConfig { + insertion: akd::AzksParallelismOption::AvailableOr(self.insertion_parallelism), + preload: akd::AzksParallelismOption::AvailableOr(self.preload_parallelism), + } + } } fn default_cache_item_lifetime_ms() -> usize { @@ -143,3 +174,11 @@ fn default_cache_item_lifetime_ms() -> usize { fn default_cache_clean_ms() -> usize { 15_000 } + +fn default_insertion_parallelism() -> u32 { + 32 +} + +fn default_preload_parallelism() -> u32 { + 32 +} diff --git a/akd/crates/akd_test_utility/src/main.rs b/akd/crates/akd_test_utility/src/main.rs index 57a79a56b2..0a72c615da 100644 --- a/akd/crates/akd_test_utility/src/main.rs +++ b/akd/crates/akd_test_utility/src/main.rs @@ -187,6 +187,8 @@ async fn main() -> Result<()> { key: "4AD95tg8tfveioyS/E2jAQw06FDTUCu+VSEZxa41wuM=".to_string(), }, publish_queue_config: akd_storage::publish_queue_config::PublishQueueConfig::DbBacked, + insertion_parallelism: 32, + preload_parallelism: 32, }; let (mut directory, db, _) = config .initialize_directory::() diff --git a/akd/crates/common/Cargo.toml b/akd/crates/common/Cargo.toml index 5fdfd768ed..50670fbbf6 100644 --- a/akd/crates/common/Cargo.toml +++ b/akd/crates/common/Cargo.toml @@ -7,7 +7,7 @@ license-file.workspace = true keywords.workspace = true [dependencies] -akd = "0.11.0" +akd.workspace = true async-trait = { workspace = true } akd_storage = { workspace = true } bitwarden-akd-configuration = { workspace = true } diff --git a/akd/crates/publisher/src/config.rs b/akd/crates/publisher/src/config.rs index 6be2ccbabc..957daaa329 100644 --- a/akd/crates/publisher/src/config.rs +++ b/akd/crates/publisher/src/config.rs @@ -10,6 +10,7 @@ const DEFAULT_EPOCH_DURATION_MS: u64 = 30000; // 30 seconds #[derive(Clone, Debug, Deserialize)] pub struct ApplicationConfig { pub storage: AkdStorageConfig, + #[serde(default)] pub publisher: PublisherConfig, /// The unique Bitwarden installation ID using this AKD publisher instance. /// This value is used to namespace AKD data to a given installation. @@ -41,6 +42,15 @@ pub struct PublisherConfig { pub epoch_update_limit: Option, } +impl Default for PublisherConfig { + fn default() -> Self { + PublisherConfig { + epoch_duration_ms: default_epoch_duration_ms(), + epoch_update_limit: default_epoch_update_limit(), + } + } +} + fn default_epoch_duration_ms() -> u64 { DEFAULT_EPOCH_DURATION_MS } diff --git a/akd/crates/reader/src/error.rs b/akd/crates/reader/src/error.rs index 6522a5726c..70464d2af3 100644 --- a/akd/crates/reader/src/error.rs +++ b/akd/crates/reader/src/error.rs @@ -47,6 +47,7 @@ pub enum ErrorCode { AkdTreeNode, AkdVerification, AkdInvalidEpoch, + AkdInvalidVersion, AkdReadOnlyDirectory, AkdPublish, AkdAzks, @@ -81,6 +82,7 @@ impl ReaderError { }, AkdError::Directory(dir_err) => match dir_err { akd::errors::DirectoryError::InvalidEpoch(_) => StatusCode::BAD_REQUEST, + akd::errors::DirectoryError::InvalidVersion(_) => StatusCode::BAD_REQUEST, akd::errors::DirectoryError::Verification(_) => { StatusCode::UNPROCESSABLE_ENTITY } @@ -114,6 +116,7 @@ impl ReaderError { AkdError::Directory(dir_err) => match dir_err { akd::errors::DirectoryError::Verification(_) => ErrorCode::AkdVerification, akd::errors::DirectoryError::InvalidEpoch(_) => ErrorCode::AkdInvalidEpoch, + akd::errors::DirectoryError::InvalidVersion(_) => ErrorCode::AkdInvalidVersion, akd::errors::DirectoryError::ReadOnlyDirectory(_) => { ErrorCode::AkdReadOnlyDirectory } diff --git a/akd/crates/reader/src/routes/key_history.rs b/akd/crates/reader/src/routes/key_history.rs index 3d74113d47..e4d84e3813 100644 --- a/akd/crates/reader/src/routes/key_history.rs +++ b/akd/crates/reader/src/routes/key_history.rs @@ -24,8 +24,6 @@ pub enum HistoryParams { Complete, /// Returns up to the most recent N updates for a label MostRecent(usize), - /// Returns all updates since a specified epoch (inclusive) - SinceEpoch(u64), } impl From for akd::HistoryParams { @@ -33,7 +31,6 @@ impl From for akd::HistoryParams { match params { HistoryParams::Complete => akd::HistoryParams::Complete, HistoryParams::MostRecent(n) => akd::HistoryParams::MostRecent(n), - HistoryParams::SinceEpoch(epoch) => akd::HistoryParams::SinceEpoch(epoch), } } }