diff --git a/akd/Cargo.lock b/akd/Cargo.lock index 4003cac612..8c2ebd99b1 100644 --- a/akd/Cargo.lock +++ b/akd/Cargo.lock @@ -2253,7 +2253,9 @@ dependencies = [ "akd", "akd_storage", "anyhow", + "axum", "bitwarden-akd-configuration", + "chrono", "common", "config", "serde", diff --git a/akd/crates/akd_storage/src/akd_storage_config.rs b/akd/crates/akd_storage/src/akd_storage_config.rs index 2ad19564d8..e00ea1e22a 100644 --- a/akd/crates/akd_storage/src/akd_storage_config.rs +++ b/akd/crates/akd_storage/src/akd_storage_config.rs @@ -7,7 +7,7 @@ use tracing::{error, instrument}; use crate::{ db_config::DbConfig, publish_queue_config::PublishQueueConfig, vrf_key_config::VrfKeyConfig, - AkdDatabase, PublishQueueType, VrfKeyDatabase, + AkdDatabase, PublishQueueType, ReadOnlyPublishQueueType, VrfKeyDatabase, }; #[derive(Debug, Clone, Deserialize)] @@ -78,7 +78,7 @@ impl AkdStorageConfig { ( ReadOnlyDirectory, AkdDatabase, - PublishQueueType, + ReadOnlyPublishQueueType, ), AkdStorageInitializationError, > { @@ -89,7 +89,7 @@ impl AkdStorageConfig { AkdStorageInitializationError })?; - let publish_queue = PublishQueueType::new(&self.publish_queue_config, &db); + let publish_queue = ReadOnlyPublishQueueType::new(&self.publish_queue_config, &db); let directory = ReadOnlyDirectory::new(storage_manager, vrf_storage) .await diff --git a/akd/crates/akd_storage/src/publish_queue.rs b/akd/crates/akd_storage/src/publish_queue.rs index 89f2849a2a..352c1871f4 100644 --- a/akd/crates/akd_storage/src/publish_queue.rs +++ b/akd/crates/akd_storage/src/publish_queue.rs @@ -80,11 +80,32 @@ impl PublishQueue for PublishQueueType { } } -#[async_trait] -impl ReadOnlyPublishQueue for PublishQueueType { - async fn label_pending_publish(&self, label: &AkdLabel) -> Result { - match self { - PublishQueueType::MsSql(ms_sql) => ms_sql.label_pending_publish(label).await, +#[derive(Debug, Clone)] +pub enum ReadOnlyPublishQueueType { + MsSql(MsSql), +} + +impl ReadOnlyPublishQueueType { + pub fn new(config: &PublishQueueConfig, db: &AkdDatabase) -> ReadOnlyPublishQueueType { + match config { + PublishQueueConfig::DbBacked => db.into(), + } + } +} + +impl From<&AkdDatabase> for ReadOnlyPublishQueueType { + fn from(db: &AkdDatabase) -> Self { + match db.db() { + DatabaseType::MsSql(ms_sql) => ReadOnlyPublishQueueType::MsSql(ms_sql.clone()), + } + } +} + +#[async_trait] +impl ReadOnlyPublishQueue for ReadOnlyPublishQueueType { + async fn label_pending_publish(&self, label: &AkdLabel) -> Result { + match self { + ReadOnlyPublishQueueType::MsSql(ms_sql) => ms_sql.label_pending_publish(label).await, } } } diff --git a/akd/crates/reader/Cargo.toml b/akd/crates/reader/Cargo.toml index effff77c3a..55708ff662 100644 --- a/akd/crates/reader/Cargo.toml +++ b/akd/crates/reader/Cargo.toml @@ -9,11 +9,17 @@ keywords.workspace = true [dependencies] tokio = { workspace = true } tracing = { workspace = true } -tracing-subscriber = { workspace = true } +tracing-subscriber = { workspace = true, features = ["env-filter"] } akd = { workspace = true } -akd_storage = { workspace = true} -bitwarden-akd-configuration = { workspace = true} -common = { workspace = true} +akd_storage = { workspace = true } +anyhow = { workspace = true } +bitwarden-akd-configuration = { workspace = true } +common = { workspace = true } +serde = { workspace = true } +config = { workspace = true } +uuid = { workspace = true } +axum = { workspace = true } +chrono = { workspace = true } [lints] workspace = true diff --git a/akd/crates/reader/src/config.rs b/akd/crates/reader/src/config.rs new file mode 100644 index 0000000000..d5652fc3f5 --- /dev/null +++ b/akd/crates/reader/src/config.rs @@ -0,0 +1,70 @@ +use akd_storage::akd_storage_config::AkdStorageConfig; +use config::{Config, ConfigError, Environment, File}; +use serde::Deserialize; +use uuid::Uuid; + +#[derive(Clone, Debug, Deserialize)] +pub struct ApplicationConfig { + pub storage: AkdStorageConfig, + /// The address the web server will bind to. Defaults to "127.0.0.1:3001". + #[serde(default = "default_web_server_bind_address")] + web_server_bind_address: String, + /// The unique Bitwarden installation ID using this AKD reader instance. + /// This value is used to namespace AKD data to a given installation. + pub installation_id: Uuid, +} + +fn default_web_server_bind_address() -> String { + "127.0.0.1:3001".to_string() +} + +impl ApplicationConfig { + /// Load configuration from multiple sources in order of priority: + /// 1. Environment variables (prefixed with AKD_READER) - always applied with highest priority + /// 2. Configuration file from AKD_READER_CONFIG_PATH environment variable (if set) + /// 3. OR default configuration file (config.toml, config.yaml, config.json) in working directory + /// + /// Environment variable naming: + /// - Uses double underscore (__) as separator + /// - For field `epoch_duration_ms`, use `AKD_READER__EPOCH_DURATION_MS` + /// - For nested fields like `storage.cache_clean_ms`, use `AKD_READER__STORAGE__CACHE_CLEAN_MS` + /// + /// Note: Only one config file source is used - either custom path OR default location + pub fn load() -> Result { + let mut builder = Config::builder(); + + // Check for custom config path via environment variable + if let Ok(config_path) = std::env::var("AKD_READER_CONFIG_PATH") { + builder = builder.add_source(File::with_name(&config_path).required(true)); + } else { + // Fall back to default config file locations + builder = builder.add_source(File::with_name("config").required(false)); + } + + let config = builder + // Add environment variables with prefix "AKD_READER_" + .add_source(Environment::with_prefix("AKD_READER").separator("__")) + .build()?; + + let reader_config: Self = config.try_deserialize()?; + + reader_config.validate()?; + + Ok(reader_config) + } + + pub fn validate(&self) -> Result<(), ConfigError> { + self.storage + .validate() + .map_err(|e| ConfigError::Message(format!("{e}")))?; + Ok(()) + } + + /// Get the web server bind address as a SocketAddr + /// Panics if the address is invalid + pub fn socket_address(&self) -> std::net::SocketAddr { + self.web_server_bind_address + .parse() + .expect("Invalid web server bind address") + } +} diff --git a/akd/crates/reader/src/lib.rs b/akd/crates/reader/src/lib.rs index d86ee1532b..f079c29329 100644 --- a/akd/crates/reader/src/lib.rs +++ b/akd/crates/reader/src/lib.rs @@ -1,20 +1,63 @@ -use akd::{directory::ReadOnlyDirectory, storage::StorageManager}; -use akd_storage::{AkdDatabase, VrfKeyDatabase}; +use akd::directory::ReadOnlyDirectory; +use akd_storage::{AkdDatabase, ReadOnlyPublishQueueType, VrfKeyDatabase}; +use anyhow::{Context, Result}; +use axum::Router; use bitwarden_akd_configuration::BitwardenV1Configuration; -use tracing::instrument; +use tokio::{net::TcpListener, sync::broadcast::Receiver}; +use tracing::{info, instrument}; +mod config; +mod routes; + +pub use crate::config::ApplicationConfig; + +#[derive(Clone)] struct AppState { // Add any shared state here, e.g., database connections - _directory: ReadOnlyDirectory, + directory: ReadOnlyDirectory, + publish_queue: ReadOnlyPublishQueueType, } #[instrument(skip_all, name = "reader_start")] -pub async fn start(db: AkdDatabase, vrf: VrfKeyDatabase) { - let storage_manager = StorageManager::new_no_cache(db); - let _app = AppState { - _directory: ReadOnlyDirectory::new(storage_manager, vrf) +pub async fn start( + config: ApplicationConfig, + shutdown_rx: &Receiver<()>, +) -> Result>> { + let (directory, _, publish_queue) = config + .storage + .initialize_readonly_directory::() + .await + .context("Failed to initialize ReadOnlyDirectory")?; + + let mut shutdown_rx = shutdown_rx.resubscribe(); + + let handle = tokio::spawn(async move { + let app_state = AppState { + directory: directory, + publish_queue: publish_queue, + }; + + let app = Router::new() + .merge(crate::routes::api_routes()) + .with_state(app_state); + + let listener = TcpListener::bind((&config.socket_address())) .await - .expect("Failed to create ReadOnlyDirectory"), - }; - println!("Reader started"); + .context("Socket bind failed")?; + info!( + socket_address = %config.socket_address(), + "Reader web server listening" + ); + + axum::serve(listener, app.into_make_service()) + .with_graceful_shutdown(async move { + shutdown_rx.recv().await.ok(); + }) + .await + .context("Web server failed")?; + + Ok(()) + }); + + Ok(handle) } diff --git a/akd/crates/reader/src/main.rs b/akd/crates/reader/src/main.rs index 29a7f24b0b..1f730c6cf0 100644 --- a/akd/crates/reader/src/main.rs +++ b/akd/crates/reader/src/main.rs @@ -1,39 +1,51 @@ //! The Reader crate is responsible for handling read requests to the AKD. It requires only read permissions to the //! underlying data stores, and can be horizontally scaled as needed. -use akd_storage::db_config::DbConfig; -use common::VrfStorageType; -use reader::start; +use anyhow::Context; +use anyhow::Result; +use tracing::error; use tracing::info; +use tracing::level_filters::LevelFilter; +use tracing_subscriber::EnvFilter; + +use reader::start; +use reader::ApplicationConfig; #[tokio::main] -async fn main() { - tracing_subscriber::fmt() - .with_max_level(tracing::Level::TRACE) - .init(); +async fn main() -> Result<()> { + let env_filter = EnvFilter::builder() + .with_default_directive(LevelFilter::INFO.into()) + .from_env_lossy(); - // Read connection string from env var - let connection_string = std::env::var("AKD_MSSQL_CONNECTION_STRING") - .expect("AKD_MSSQL_CONNECTION_STRING must be set."); - let db_config = DbConfig::MsSql { - connection_string, - pool_size: 10, - }; + tracing_subscriber::fmt().with_env_filter(env_filter).init(); - let db = db_config - .connect() + // Load configuration + let config = ApplicationConfig::load().context("Failed to load configuration")?; + + // Initialize Bitwarden AKD configuration + bitwarden_akd_configuration::BitwardenV1Configuration::init(config.installation_id); + + let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel(1); + + let mut web_handle = start(config, &shutdown_rx) .await - .expect("Failed to connect to database"); - let vrf = VrfStorageType::HardCodedAkdVRF; + .context("Failed to start reader")?; - let web_server_handle = tokio::spawn(async move { - start(db, vrf).await; - }); - - // Wait for both services to complete + // wait for shutdown signal tokio::select! { - _ = web_server_handle => { - info!("Web service completed"); + _ = tokio::signal::ctrl_c() => { + info!("Received Ctrl+C, shutting down"); + shutdown_tx.send(()).ok(); + } + _ = &mut web_handle => { + error!("Web service completed unexpectedly"); } } + + web_handle + .await + .expect("Failed to join web service task") + .expect("Web service task failed"); + + Ok(()) } diff --git a/akd/crates/reader/src/routes/health.rs b/akd/crates/reader/src/routes/health.rs new file mode 100644 index 0000000000..a299442bc7 --- /dev/null +++ b/akd/crates/reader/src/routes/health.rs @@ -0,0 +1,17 @@ +use axum::Json; +use serde::{Deserialize, Serialize}; +use tracing::{info, instrument}; + +#[derive(Debug, Serialize, Deserialize)] +pub struct ServerHealth { + time: String, +} + +#[instrument(skip_all)] +pub async fn health_handler() -> Json { + info!("Handling server health request"); + + let time = chrono::Utc::now().to_rfc3339(); + + Json(ServerHealth { time }) +} diff --git a/akd/crates/reader/src/routes/mod.rs b/akd/crates/reader/src/routes/mod.rs new file mode 100644 index 0000000000..a1ef4e49e1 --- /dev/null +++ b/akd/crates/reader/src/routes/mod.rs @@ -0,0 +1,9 @@ +use axum::routing::get; + +mod health; + +use crate::AppState; + +pub fn api_routes() -> axum::Router { + axum::Router::new().route("/health", get(health::health_handler)) +}