mirror of
https://github.com/bitwarden/server
synced 2026-01-31 00:33:17 +00:00
First working build of reader server
This commit is contained in:
2
akd/Cargo.lock
generated
2
akd/Cargo.lock
generated
@@ -2253,7 +2253,9 @@ dependencies = [
|
||||
"akd",
|
||||
"akd_storage",
|
||||
"anyhow",
|
||||
"axum",
|
||||
"bitwarden-akd-configuration",
|
||||
"chrono",
|
||||
"common",
|
||||
"config",
|
||||
"serde",
|
||||
|
||||
@@ -7,7 +7,7 @@ use tracing::{error, instrument};
|
||||
|
||||
use crate::{
|
||||
db_config::DbConfig, publish_queue_config::PublishQueueConfig, vrf_key_config::VrfKeyConfig,
|
||||
AkdDatabase, PublishQueueType, VrfKeyDatabase,
|
||||
AkdDatabase, PublishQueueType, ReadOnlyPublishQueueType, VrfKeyDatabase,
|
||||
};
|
||||
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
@@ -78,7 +78,7 @@ impl AkdStorageConfig {
|
||||
(
|
||||
ReadOnlyDirectory<TDirectoryConfig, AkdDatabase, VrfKeyDatabase>,
|
||||
AkdDatabase,
|
||||
PublishQueueType,
|
||||
ReadOnlyPublishQueueType,
|
||||
),
|
||||
AkdStorageInitializationError,
|
||||
> {
|
||||
@@ -89,7 +89,7 @@ impl AkdStorageConfig {
|
||||
AkdStorageInitializationError
|
||||
})?;
|
||||
|
||||
let publish_queue = PublishQueueType::new(&self.publish_queue_config, &db);
|
||||
let publish_queue = ReadOnlyPublishQueueType::new(&self.publish_queue_config, &db);
|
||||
|
||||
let directory = ReadOnlyDirectory::new(storage_manager, vrf_storage)
|
||||
.await
|
||||
|
||||
@@ -80,11 +80,32 @@ impl PublishQueue for PublishQueueType {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ReadOnlyPublishQueue for PublishQueueType {
|
||||
async fn label_pending_publish(&self, label: &AkdLabel) -> Result<bool, PublishQueueError> {
|
||||
match self {
|
||||
PublishQueueType::MsSql(ms_sql) => ms_sql.label_pending_publish(label).await,
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum ReadOnlyPublishQueueType {
|
||||
MsSql(MsSql),
|
||||
}
|
||||
|
||||
impl ReadOnlyPublishQueueType {
|
||||
pub fn new(config: &PublishQueueConfig, db: &AkdDatabase) -> ReadOnlyPublishQueueType {
|
||||
match config {
|
||||
PublishQueueConfig::DbBacked => db.into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&AkdDatabase> for ReadOnlyPublishQueueType {
|
||||
fn from(db: &AkdDatabase) -> Self {
|
||||
match db.db() {
|
||||
DatabaseType::MsSql(ms_sql) => ReadOnlyPublishQueueType::MsSql(ms_sql.clone()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ReadOnlyPublishQueue for ReadOnlyPublishQueueType {
|
||||
async fn label_pending_publish(&self, label: &AkdLabel) -> Result<bool, PublishQueueError> {
|
||||
match self {
|
||||
ReadOnlyPublishQueueType::MsSql(ms_sql) => ms_sql.label_pending_publish(label).await,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,11 +9,17 @@ keywords.workspace = true
|
||||
[dependencies]
|
||||
tokio = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
tracing-subscriber = { workspace = true }
|
||||
tracing-subscriber = { workspace = true, features = ["env-filter"] }
|
||||
akd = { workspace = true }
|
||||
akd_storage = { workspace = true}
|
||||
bitwarden-akd-configuration = { workspace = true}
|
||||
common = { workspace = true}
|
||||
akd_storage = { workspace = true }
|
||||
anyhow = { workspace = true }
|
||||
bitwarden-akd-configuration = { workspace = true }
|
||||
common = { workspace = true }
|
||||
serde = { workspace = true }
|
||||
config = { workspace = true }
|
||||
uuid = { workspace = true }
|
||||
axum = { workspace = true }
|
||||
chrono = { workspace = true }
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
70
akd/crates/reader/src/config.rs
Normal file
70
akd/crates/reader/src/config.rs
Normal file
@@ -0,0 +1,70 @@
|
||||
use akd_storage::akd_storage_config::AkdStorageConfig;
|
||||
use config::{Config, ConfigError, Environment, File};
|
||||
use serde::Deserialize;
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(Clone, Debug, Deserialize)]
|
||||
pub struct ApplicationConfig {
|
||||
pub storage: AkdStorageConfig,
|
||||
/// The address the web server will bind to. Defaults to "127.0.0.1:3001".
|
||||
#[serde(default = "default_web_server_bind_address")]
|
||||
web_server_bind_address: String,
|
||||
/// The unique Bitwarden installation ID using this AKD reader instance.
|
||||
/// This value is used to namespace AKD data to a given installation.
|
||||
pub installation_id: Uuid,
|
||||
}
|
||||
|
||||
fn default_web_server_bind_address() -> String {
|
||||
"127.0.0.1:3001".to_string()
|
||||
}
|
||||
|
||||
impl ApplicationConfig {
|
||||
/// Load configuration from multiple sources in order of priority:
|
||||
/// 1. Environment variables (prefixed with AKD_READER) - always applied with highest priority
|
||||
/// 2. Configuration file from AKD_READER_CONFIG_PATH environment variable (if set)
|
||||
/// 3. OR default configuration file (config.toml, config.yaml, config.json) in working directory
|
||||
///
|
||||
/// Environment variable naming:
|
||||
/// - Uses double underscore (__) as separator
|
||||
/// - For field `epoch_duration_ms`, use `AKD_READER__EPOCH_DURATION_MS`
|
||||
/// - For nested fields like `storage.cache_clean_ms`, use `AKD_READER__STORAGE__CACHE_CLEAN_MS`
|
||||
///
|
||||
/// Note: Only one config file source is used - either custom path OR default location
|
||||
pub fn load() -> Result<Self, ConfigError> {
|
||||
let mut builder = Config::builder();
|
||||
|
||||
// Check for custom config path via environment variable
|
||||
if let Ok(config_path) = std::env::var("AKD_READER_CONFIG_PATH") {
|
||||
builder = builder.add_source(File::with_name(&config_path).required(true));
|
||||
} else {
|
||||
// Fall back to default config file locations
|
||||
builder = builder.add_source(File::with_name("config").required(false));
|
||||
}
|
||||
|
||||
let config = builder
|
||||
// Add environment variables with prefix "AKD_READER_"
|
||||
.add_source(Environment::with_prefix("AKD_READER").separator("__"))
|
||||
.build()?;
|
||||
|
||||
let reader_config: Self = config.try_deserialize()?;
|
||||
|
||||
reader_config.validate()?;
|
||||
|
||||
Ok(reader_config)
|
||||
}
|
||||
|
||||
pub fn validate(&self) -> Result<(), ConfigError> {
|
||||
self.storage
|
||||
.validate()
|
||||
.map_err(|e| ConfigError::Message(format!("{e}")))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Get the web server bind address as a SocketAddr
|
||||
/// Panics if the address is invalid
|
||||
pub fn socket_address(&self) -> std::net::SocketAddr {
|
||||
self.web_server_bind_address
|
||||
.parse()
|
||||
.expect("Invalid web server bind address")
|
||||
}
|
||||
}
|
||||
@@ -1,20 +1,63 @@
|
||||
use akd::{directory::ReadOnlyDirectory, storage::StorageManager};
|
||||
use akd_storage::{AkdDatabase, VrfKeyDatabase};
|
||||
use akd::directory::ReadOnlyDirectory;
|
||||
use akd_storage::{AkdDatabase, ReadOnlyPublishQueueType, VrfKeyDatabase};
|
||||
use anyhow::{Context, Result};
|
||||
use axum::Router;
|
||||
use bitwarden_akd_configuration::BitwardenV1Configuration;
|
||||
use tracing::instrument;
|
||||
use tokio::{net::TcpListener, sync::broadcast::Receiver};
|
||||
use tracing::{info, instrument};
|
||||
|
||||
mod config;
|
||||
mod routes;
|
||||
|
||||
pub use crate::config::ApplicationConfig;
|
||||
|
||||
#[derive(Clone)]
|
||||
struct AppState {
|
||||
// Add any shared state here, e.g., database connections
|
||||
_directory: ReadOnlyDirectory<BitwardenV1Configuration, AkdDatabase, VrfKeyDatabase>,
|
||||
directory: ReadOnlyDirectory<BitwardenV1Configuration, AkdDatabase, VrfKeyDatabase>,
|
||||
publish_queue: ReadOnlyPublishQueueType,
|
||||
}
|
||||
|
||||
#[instrument(skip_all, name = "reader_start")]
|
||||
pub async fn start(db: AkdDatabase, vrf: VrfKeyDatabase) {
|
||||
let storage_manager = StorageManager::new_no_cache(db);
|
||||
let _app = AppState {
|
||||
_directory: ReadOnlyDirectory::new(storage_manager, vrf)
|
||||
pub async fn start(
|
||||
config: ApplicationConfig,
|
||||
shutdown_rx: &Receiver<()>,
|
||||
) -> Result<tokio::task::JoinHandle<Result<()>>> {
|
||||
let (directory, _, publish_queue) = config
|
||||
.storage
|
||||
.initialize_readonly_directory::<BitwardenV1Configuration>()
|
||||
.await
|
||||
.context("Failed to initialize ReadOnlyDirectory")?;
|
||||
|
||||
let mut shutdown_rx = shutdown_rx.resubscribe();
|
||||
|
||||
let handle = tokio::spawn(async move {
|
||||
let app_state = AppState {
|
||||
directory: directory,
|
||||
publish_queue: publish_queue,
|
||||
};
|
||||
|
||||
let app = Router::new()
|
||||
.merge(crate::routes::api_routes())
|
||||
.with_state(app_state);
|
||||
|
||||
let listener = TcpListener::bind((&config.socket_address()))
|
||||
.await
|
||||
.expect("Failed to create ReadOnlyDirectory"),
|
||||
};
|
||||
println!("Reader started");
|
||||
.context("Socket bind failed")?;
|
||||
info!(
|
||||
socket_address = %config.socket_address(),
|
||||
"Reader web server listening"
|
||||
);
|
||||
|
||||
axum::serve(listener, app.into_make_service())
|
||||
.with_graceful_shutdown(async move {
|
||||
shutdown_rx.recv().await.ok();
|
||||
})
|
||||
.await
|
||||
.context("Web server failed")?;
|
||||
|
||||
Ok(())
|
||||
});
|
||||
|
||||
Ok(handle)
|
||||
}
|
||||
|
||||
@@ -1,39 +1,51 @@
|
||||
//! The Reader crate is responsible for handling read requests to the AKD. It requires only read permissions to the
|
||||
//! underlying data stores, and can be horizontally scaled as needed.
|
||||
|
||||
use akd_storage::db_config::DbConfig;
|
||||
use common::VrfStorageType;
|
||||
use reader::start;
|
||||
use anyhow::Context;
|
||||
use anyhow::Result;
|
||||
use tracing::error;
|
||||
use tracing::info;
|
||||
use tracing::level_filters::LevelFilter;
|
||||
use tracing_subscriber::EnvFilter;
|
||||
|
||||
use reader::start;
|
||||
use reader::ApplicationConfig;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
tracing_subscriber::fmt()
|
||||
.with_max_level(tracing::Level::TRACE)
|
||||
.init();
|
||||
async fn main() -> Result<()> {
|
||||
let env_filter = EnvFilter::builder()
|
||||
.with_default_directive(LevelFilter::INFO.into())
|
||||
.from_env_lossy();
|
||||
|
||||
// Read connection string from env var
|
||||
let connection_string = std::env::var("AKD_MSSQL_CONNECTION_STRING")
|
||||
.expect("AKD_MSSQL_CONNECTION_STRING must be set.");
|
||||
let db_config = DbConfig::MsSql {
|
||||
connection_string,
|
||||
pool_size: 10,
|
||||
};
|
||||
tracing_subscriber::fmt().with_env_filter(env_filter).init();
|
||||
|
||||
let db = db_config
|
||||
.connect()
|
||||
// Load configuration
|
||||
let config = ApplicationConfig::load().context("Failed to load configuration")?;
|
||||
|
||||
// Initialize Bitwarden AKD configuration
|
||||
bitwarden_akd_configuration::BitwardenV1Configuration::init(config.installation_id);
|
||||
|
||||
let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel(1);
|
||||
|
||||
let mut web_handle = start(config, &shutdown_rx)
|
||||
.await
|
||||
.expect("Failed to connect to database");
|
||||
let vrf = VrfStorageType::HardCodedAkdVRF;
|
||||
.context("Failed to start reader")?;
|
||||
|
||||
let web_server_handle = tokio::spawn(async move {
|
||||
start(db, vrf).await;
|
||||
});
|
||||
|
||||
// Wait for both services to complete
|
||||
// wait for shutdown signal
|
||||
tokio::select! {
|
||||
_ = web_server_handle => {
|
||||
info!("Web service completed");
|
||||
_ = tokio::signal::ctrl_c() => {
|
||||
info!("Received Ctrl+C, shutting down");
|
||||
shutdown_tx.send(()).ok();
|
||||
}
|
||||
_ = &mut web_handle => {
|
||||
error!("Web service completed unexpectedly");
|
||||
}
|
||||
}
|
||||
|
||||
web_handle
|
||||
.await
|
||||
.expect("Failed to join web service task")
|
||||
.expect("Web service task failed");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
17
akd/crates/reader/src/routes/health.rs
Normal file
17
akd/crates/reader/src/routes/health.rs
Normal file
@@ -0,0 +1,17 @@
|
||||
use axum::Json;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tracing::{info, instrument};
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct ServerHealth {
|
||||
time: String,
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
pub async fn health_handler() -> Json<ServerHealth> {
|
||||
info!("Handling server health request");
|
||||
|
||||
let time = chrono::Utc::now().to_rfc3339();
|
||||
|
||||
Json(ServerHealth { time })
|
||||
}
|
||||
9
akd/crates/reader/src/routes/mod.rs
Normal file
9
akd/crates/reader/src/routes/mod.rs
Normal file
@@ -0,0 +1,9 @@
|
||||
use axum::routing::get;
|
||||
|
||||
mod health;
|
||||
|
||||
use crate::AppState;
|
||||
|
||||
pub fn api_routes() -> axum::Router<AppState> {
|
||||
axum::Router::new().route("/health", get(health::health_handler))
|
||||
}
|
||||
Reference in New Issue
Block a user