From bd29b3551cfef91adffb37e3cc9fce8c77a34232 Mon Sep 17 00:00:00 2001 From: Matt Gibson Date: Wed, 21 Jan 2026 14:16:43 -0800 Subject: [PATCH] build AIO application --- akd/Cargo.lock | 5 + akd/crates/aio/Cargo.toml | 11 +- akd/crates/aio/src/config.rs | 202 +++++++++++++++++++++++++++++ akd/crates/aio/src/main.rs | 68 ++++++++-- akd/crates/publisher/src/config.rs | 2 +- akd/crates/publisher/src/lib.rs | 2 +- akd/crates/reader/src/config.rs | 2 +- 7 files changed, 278 insertions(+), 14 deletions(-) create mode 100644 akd/crates/aio/src/config.rs diff --git a/akd/Cargo.lock b/akd/Cargo.lock index af85061eb3..110cb4d26e 100644 --- a/akd/Cargo.lock +++ b/akd/Cargo.lock @@ -41,12 +41,17 @@ name = "aio" version = "0.1.0" dependencies = [ "akd_storage", + "anyhow", + "bitwarden-akd-configuration", "common", + "config", "publisher", "reader", + "serde", "tokio", "tracing", "tracing-subscriber", + "uuid", ] [[package]] diff --git a/akd/crates/aio/Cargo.toml b/akd/crates/aio/Cargo.toml index 71575d16c9..1b34b1dd0d 100644 --- a/akd/crates/aio/Cargo.toml +++ b/akd/crates/aio/Cargo.toml @@ -8,12 +8,17 @@ keywords.workspace = true [dependencies] akd_storage = { workspace = true } +anyhow = { workspace = true } +bitwarden-akd-configuration = { workspace = true } common = { workspace = true } -tracing = { workspace = true } -tracing-subscriber = { workspace = true } -tokio = { workspace = true } +config = { workspace = true } publisher = { path = "../publisher" } reader = { path = "../reader" } +serde = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } +tracing-subscriber = { workspace = true } +uuid = { workspace = true } [lints] workspace = true diff --git a/akd/crates/aio/src/config.rs b/akd/crates/aio/src/config.rs new file mode 100644 index 0000000000..bf56e3af7f --- /dev/null +++ b/akd/crates/aio/src/config.rs @@ -0,0 +1,202 @@ +use akd_storage::akd_storage_config::AkdStorageConfig; +use config::{Config, ConfigError, Environment, File}; +use serde::Deserialize; +use uuid::Uuid; + +const DEFAULT_EPOCH_DURATION_MS: u64 = 30000; // 30 seconds +const DEFAULT_MAX_BATCH_LOOKUP_SIZE: usize = 10; +const DEFAULT_AZKS_POLL_INTERVAL_MS: u64 = 100; + +/// Application configuration for the AIO (All-in-One) AKD service +#[derive(Clone, Debug, Deserialize)] +pub struct ApplicationConfig { + pub storage: AkdStorageConfig, + /// The unique Bitwarden installation ID using this AKD instance. + /// This value is used to namespace AKD data to a given installation. + pub installation_id: Uuid, + #[serde(default)] + pub publisher: PublisherSettings, + #[serde(default)] + pub reader: ReaderSettings, +} + +/// Configuration for the Publisher service +#[derive(Clone, Debug, Deserialize)] +pub struct PublisherSettings { + /// The duration of each publishing epoch in milliseconds. Defaults to 30 seconds. + #[serde(default = "default_epoch_duration_ms")] + pub epoch_duration_ms: u64, + /// The limit to the number of AKD values to update in a single epoch. Defaults to no limit. + #[serde(default)] + pub epoch_update_limit: Option, + /// The address the publisher web server will bind to. Defaults to "127.0.0.1:3000". + #[serde(default = "default_publisher_web_server_bind_address")] + pub web_server_bind_address: String, + /// The API key required to access the publisher web server endpoints. + /// + /// NOTE: constant-time comparison is used, but mismatched string length cause immediate failure. + /// For this reason, timing attacks can be used to at least determine the valid key length and a + /// sufficiently long key should be used to mitigate this risk. + pub web_server_api_key: String, +} + +/// Configuration for the Reader service +#[derive(Clone, Debug, Deserialize)] +pub struct ReaderSettings { + /// The address the reader web server will bind to. Defaults to "127.0.0.1:3001". + #[serde(default = "default_reader_web_server_bind_address")] + pub web_server_bind_address: String, + /// Maximum number of labels allowed in a single batch lookup request. Defaults to 10. + #[serde(default = "default_max_batch_lookup_size")] + pub max_batch_lookup_size: usize, + /// Polling interval for AZKS storage in milliseconds. Should be significantly less than the epoch interval. Defaults to 100 ms. + #[serde(default = "default_azks_poll_interval_ms")] + pub azks_poll_interval_ms: u64, +} + +fn default_epoch_duration_ms() -> u64 { + DEFAULT_EPOCH_DURATION_MS +} + +fn default_publisher_web_server_bind_address() -> String { + "127.0.0.1:3000".to_string() +} + +fn default_reader_web_server_bind_address() -> String { + "127.0.0.1:3001".to_string() +} + +fn default_max_batch_lookup_size() -> usize { + DEFAULT_MAX_BATCH_LOOKUP_SIZE +} + +fn default_azks_poll_interval_ms() -> u64 { + DEFAULT_AZKS_POLL_INTERVAL_MS +} + +impl Default for PublisherSettings { + fn default() -> Self { + PublisherSettings { + epoch_duration_ms: default_epoch_duration_ms(), + epoch_update_limit: None, + web_server_bind_address: default_publisher_web_server_bind_address(), + web_server_api_key: String::new(), + } + } +} + +impl Default for ReaderSettings { + fn default() -> Self { + ReaderSettings { + web_server_bind_address: default_reader_web_server_bind_address(), + max_batch_lookup_size: default_max_batch_lookup_size(), + azks_poll_interval_ms: default_azks_poll_interval_ms(), + } + } +} + +impl ApplicationConfig { + /// Load configuration from multiple sources in order of priority: + /// 1. Environment variables (prefixed with AKD_AIO) - always applied with highest priority + /// 2. Configuration file from AKD_AIO_CONFIG_PATH environment variable (if set) + /// 3. OR default configuration file (config.toml, config.yaml, config.json) in working directory + /// + /// Environment variable naming: + /// - Uses double underscore (__) as separator + /// - For field `installation_id`, use `AKD_AIO__INSTALLATION_ID` + /// - For nested fields like `storage.cache_clean_ms`, use `AKD_AIO__STORAGE__CACHE_CLEAN_MS` + /// - For publisher fields like `publisher.epoch_duration_ms`, use `AKD_AIO__PUBLISHER__EPOCH_DURATION_MS` + /// + /// Note: Only one config file source is used - either custom path OR default location + pub fn load() -> Result { + let mut builder = Config::builder(); + + // Check for custom config path via environment variable + if let Ok(config_path) = std::env::var("AKD_AIO_CONFIG_PATH") { + builder = builder.add_source(File::with_name(&config_path).required(true)); + } else { + // Fall back to default config file locations + builder = builder.add_source(File::with_name("config").required(false)); + } + + let config = builder + // Add environment variables with prefix "AKD_AIO_" + .add_source(Environment::with_prefix("AKD_AIO").separator("__")) + .build()?; + + let aio_config: Self = config.try_deserialize()?; + + aio_config.validate()?; + + Ok(aio_config) + } + + pub fn validate(&self) -> Result<(), ConfigError> { + self.storage + .validate() + .map_err(|e| ConfigError::Message(format!("{e}")))?; + self.publisher.validate()?; + self.reader.validate()?; + Ok(()) + } + +} + +impl PublisherSettings { + pub fn validate(&self) -> Result<(), ConfigError> { + if self.epoch_duration_ms == 0 { + return Err(ConfigError::Message( + "epoch_duration_ms must be greater than 0".to_string(), + )); + } + if self.web_server_api_key.is_empty() { + return Err(ConfigError::Message( + "web_server_api_key is required".to_string(), + )); + } + Ok(()) + } +} + +impl ReaderSettings { + pub fn validate(&self) -> Result<(), ConfigError> { + if self.max_batch_lookup_size == 0 { + return Err(ConfigError::Message( + "max_batch_lookup_size must be greater than 0".to_string(), + )); + } + if self.azks_poll_interval_ms == 0 { + return Err(ConfigError::Message( + "azks_poll_interval_ms must be greater than 0".to_string(), + )); + } + Ok(()) + } +} + +impl From<&ApplicationConfig> for publisher::ApplicationConfig { + fn from(config: &ApplicationConfig) -> Self { + publisher::ApplicationConfig { + storage: config.storage.clone(), + publisher: publisher::PublisherConfig { + epoch_duration_ms: config.publisher.epoch_duration_ms, + epoch_update_limit: config.publisher.epoch_update_limit, + }, + installation_id: config.installation_id, + web_server_bind_address: config.publisher.web_server_bind_address.clone(), + web_server_api_key: config.publisher.web_server_api_key.clone(), + } + } +} + +impl From<&ApplicationConfig> for reader::ApplicationConfig { + fn from(config: &ApplicationConfig) -> Self { + reader::ApplicationConfig { + storage: config.storage.clone(), + web_server_bind_address: config.reader.web_server_bind_address.clone(), + installation_id: config.installation_id, + max_batch_lookup_size: config.reader.max_batch_lookup_size, + azks_poll_interval_ms: config.reader.azks_poll_interval_ms, + } + } +} diff --git a/akd/crates/aio/src/main.rs b/akd/crates/aio/src/main.rs index a27284444b..b50dc4849c 100644 --- a/akd/crates/aio/src/main.rs +++ b/akd/crates/aio/src/main.rs @@ -2,23 +2,75 @@ //! Requires both read and write permissions to the underlying data stores. //! There should only be one instance of this running at a time for a given AKD. +use anyhow::{Context, Result}; +use tracing::{error, info}; use tracing_subscriber::EnvFilter; +mod config; + +use config::ApplicationConfig; + #[tokio::main] -#[allow(unreachable_code)] -async fn main() { +async fn main() -> Result<()> { let env_filter = EnvFilter::builder() .with_default_directive(tracing::level_filters::LevelFilter::INFO.into()) .from_env_lossy(); tracing_subscriber::fmt().with_env_filter(env_filter).init(); - // Load config and convert to publisher and reader configs - todo!(); + // Load configuration + let config = ApplicationConfig::load() + .map_err(|e| anyhow::anyhow!("Failed to load configuration: {e}"))?; - // Start publisher task - todo!(); + // Initialize Bitwarden AKD configuration (must happen before starting services) + bitwarden_akd_configuration::BitwardenV1Configuration::init(config.installation_id); - // Start reader task - todo!(); + // Create shutdown channel for coordinated shutdown + let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel(1); + + // Convert unified config to service-specific configs + let publisher_config = publisher::ApplicationConfig::from(&config); + let reader_config = reader::ApplicationConfig::from(&config); + + // Start publisher service + let mut publisher_handles = publisher::start(publisher_config, &shutdown_rx) + .await + .context("Failed to start publisher")?; + + // Start reader service + let mut reader_handle = reader::start(reader_config, &shutdown_rx) + .await + .context("Failed to start reader")?; + + // Wait for shutdown signal or service completion + tokio::select! { + _ = tokio::signal::ctrl_c() => { + info!("Received Ctrl+C, shutting down"); + shutdown_tx.send(()).ok(); + } + _ = &mut publisher_handles.write_handle => { + error!("Publisher write service completed unexpectedly"); + } + _ = &mut publisher_handles.web_handle => { + error!("Publisher web service completed unexpectedly"); + } + _ = &mut reader_handle => { + error!("Reader service completed unexpectedly"); + } + } + + // Wait for all services to complete + info!("Waiting for services to shut down..."); + publisher_handles.write_handle.await.ok(); + publisher_handles.web_handle.await.ok(); + + // Reader handle returns a Result, so we need to handle it properly + match reader_handle.await { + Ok(Ok(())) => info!("Reader service shut down successfully"), + Ok(Err(e)) => error!("Reader service failed: {e}"), + Err(e) => error!("Failed to join reader service task: {e}"), + } + + info!("All services shut down"); + Ok(()) } diff --git a/akd/crates/publisher/src/config.rs b/akd/crates/publisher/src/config.rs index 957daaa329..f7746f874c 100644 --- a/akd/crates/publisher/src/config.rs +++ b/akd/crates/publisher/src/config.rs @@ -17,7 +17,7 @@ pub struct ApplicationConfig { pub installation_id: Uuid, /// The address the web server will bind to. Defaults to "127.0.0.1:3000". #[serde(default = "default_web_server_bind_address")] - web_server_bind_address: String, + pub web_server_bind_address: String, /// The API key required to access the web server endpoints. /// /// NOTE: constant-time comparison is used, but mismatched string length cause immediate failure. diff --git a/akd/crates/publisher/src/lib.rs b/akd/crates/publisher/src/lib.rs index 24972a0f25..8363308c28 100644 --- a/akd/crates/publisher/src/lib.rs +++ b/akd/crates/publisher/src/lib.rs @@ -9,7 +9,7 @@ use tracing::{error, info, instrument, trace}; mod config; mod routes; -pub use crate::config::ApplicationConfig; +pub use crate::config::{ApplicationConfig, PublisherConfig}; use crate::routes::auth; pub struct AppHandles { diff --git a/akd/crates/reader/src/config.rs b/akd/crates/reader/src/config.rs index 54e30095ee..e8b4f5d6d3 100644 --- a/akd/crates/reader/src/config.rs +++ b/akd/crates/reader/src/config.rs @@ -8,7 +8,7 @@ pub struct ApplicationConfig { pub storage: AkdStorageConfig, /// The address the web server will bind to. Defaults to "127.0.0.1:3001". #[serde(default = "default_web_server_bind_address")] - web_server_bind_address: String, + pub web_server_bind_address: String, /// The unique Bitwarden installation ID using this AKD reader instance. /// This value is used to namespace AKD data to a given installation. pub installation_id: Uuid,