1
0
mirror of https://github.com/bitwarden/server synced 2026-02-06 03:33:43 +00:00

first stub of publisher job

This commit is contained in:
Matt Gibson
2026-01-12 12:06:38 -08:00
parent 3db09c8b5f
commit a4fca3dfe6
18 changed files with 622 additions and 68 deletions

314
akd/Cargo.lock generated
View File

@@ -121,6 +121,15 @@ dependencies = [
"tracing-subscriber",
]
[[package]]
name = "android_system_properties"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311"
dependencies = [
"libc",
]
[[package]]
name = "anstream"
version = "0.6.21"
@@ -378,6 +387,70 @@ version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8"
[[package]]
name = "axum"
version = "0.8.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b52af3cb4058c895d37317bb27508dccc8e5f2d39454016b297bf4a400597b8"
dependencies = [
"axum-core",
"axum-macros",
"bytes",
"form_urlencoded",
"futures-util",
"http",
"http-body",
"http-body-util",
"hyper",
"hyper-util",
"itoa",
"matchit",
"memchr",
"mime",
"percent-encoding",
"pin-project-lite",
"serde_core",
"serde_json",
"serde_path_to_error",
"serde_urlencoded",
"sync_wrapper",
"tokio",
"tower",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "axum-core"
version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08c78f31d7b1291f7ee735c1c6780ccde7785daae9a9206026862dab7d8792d1"
dependencies = [
"bytes",
"futures-core",
"http",
"http-body",
"http-body-util",
"mime",
"pin-project-lite",
"sync_wrapper",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "axum-macros"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "604fde5e028fea851ce1d8570bbdc034bec850d157f7569d10f347d06808c05c"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.106",
]
[[package]]
name = "backtrace"
version = "0.3.76"
@@ -558,7 +631,11 @@ version = "0.4.42"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "145052bdd345b87320e369255277e3fb5152762ad123a901ef5c262dd38fe8d2"
dependencies = [
"iana-time-zone",
"js-sys",
"num-traits",
"wasm-bindgen",
"windows-link",
]
[[package]]
@@ -625,6 +702,7 @@ dependencies = [
"akd",
"akd_storage",
"async-trait",
"bitwarden-akd-configuration",
"config",
"hex",
"serde",
@@ -1226,6 +1304,112 @@ dependencies = [
"windows-sys 0.59.0",
]
[[package]]
name = "http"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3ba2a386d7f85a81f119ad7498ebe444d2e22c2af0b86b069416ace48b3311a"
dependencies = [
"bytes",
"itoa",
]
[[package]]
name = "http-body"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184"
dependencies = [
"bytes",
"http",
]
[[package]]
name = "http-body-util"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b021d93e26becf5dc7e1b75b1bed1fd93124b374ceb73f43d4d4eafec896a64a"
dependencies = [
"bytes",
"futures-core",
"http",
"http-body",
"pin-project-lite",
]
[[package]]
name = "httparse"
version = "1.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6dbf3de79e51f3d586ab4cb9d5c3e2c14aa28ed23d180cf89b4df0454a69cc87"
[[package]]
name = "httpdate"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9"
[[package]]
name = "hyper"
version = "1.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2ab2d4f250c3d7b1c9fcdff1cece94ea4e2dfbec68614f7b87cb205f24ca9d11"
dependencies = [
"atomic-waker",
"bytes",
"futures-channel",
"futures-core",
"http",
"http-body",
"httparse",
"httpdate",
"itoa",
"pin-project-lite",
"pin-utils",
"smallvec",
"tokio",
]
[[package]]
name = "hyper-util"
version = "0.1.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "727805d60e7938b76b826a6ef209eb70eaa1812794f9424d4a4e2d740662df5f"
dependencies = [
"bytes",
"futures-core",
"http",
"http-body",
"hyper",
"pin-project-lite",
"tokio",
"tower-service",
]
[[package]]
name = "iana-time-zone"
version = "0.1.64"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33e57f83510bb73707521ebaffa789ec8caf86f9657cad665b092b581d40e9fb"
dependencies = [
"android_system_properties",
"core-foundation-sys",
"iana-time-zone-haiku",
"js-sys",
"log",
"wasm-bindgen",
"windows-core",
]
[[package]]
name = "iana-time-zone-haiku"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f"
dependencies = [
"cc",
]
[[package]]
name = "icu_collections"
version = "2.1.1"
@@ -1478,6 +1662,12 @@ dependencies = [
"regex-automata",
]
[[package]]
name = "matchit"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3"
[[package]]
name = "md5"
version = "0.6.1"
@@ -1490,6 +1680,12 @@ version = "2.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f52b00d39961fc5b2736ea853c9cc86238e165017a493d1d5c8eac6bdc4cc273"
[[package]]
name = "mime"
version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a"
[[package]]
name = "miniz_oxide"
version = "0.8.9"
@@ -1941,14 +2137,22 @@ version = "0.1.0"
dependencies = [
"akd",
"akd_storage",
"anyhow",
"async-std",
"axum",
"bitwarden-akd-configuration",
"bitwarden-encoding",
"chrono",
"common",
"config",
"serde",
"thiserror 2.0.17",
"tiberius",
"tokio",
"tokio-util",
"tracing",
"tracing-subscriber",
"uuid",
]
[[package]]
@@ -2349,6 +2553,17 @@ dependencies = [
"serde_core",
]
[[package]]
name = "serde_path_to_error"
version = "0.1.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "10a9ff822e371bb5403e391ecd83e182e0e77ba7f6fe0160b795797109d1b457"
dependencies = [
"itoa",
"serde",
"serde_core",
]
[[package]]
name = "serde_spanned"
version = "1.0.2"
@@ -2358,6 +2573,18 @@ dependencies = [
"serde_core",
]
[[package]]
name = "serde_urlencoded"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd"
dependencies = [
"form_urlencoded",
"itoa",
"ryu",
"serde",
]
[[package]]
name = "sha2"
version = "0.10.9"
@@ -2481,6 +2708,12 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "sync_wrapper"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263"
[[package]]
name = "synstructure"
version = "0.13.2"
@@ -2713,6 +2946,34 @@ version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d163a63c116ce562a22cda521fcc4d79152e7aba014456fb5eb442f6d6a10109"
[[package]]
name = "tower"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d039ad9159c98b70ecfd540b2573b97f7f52c3e8d9f8ad57a24b916a536975f9"
dependencies = [
"futures-core",
"futures-util",
"pin-project-lite",
"sync_wrapper",
"tokio",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "tower-layer"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e"
[[package]]
name = "tower-service"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3"
[[package]]
name = "tracing"
version = "0.1.41"
@@ -3063,12 +3324,65 @@ dependencies = [
"winapi",
]
[[package]]
name = "windows-core"
version = "0.62.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6844ee5416b285084d3d3fffd743b925a6c9385455f64f6d4fa3031c4c2749a9"
dependencies = [
"windows-implement",
"windows-interface",
"windows-link",
"windows-result",
"windows-strings",
]
[[package]]
name = "windows-implement"
version = "0.60.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "053e2e040ab57b9dc951b72c264860db7eb3b0200ba345b4e4c3b14f67855ddf"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.106",
]
[[package]]
name = "windows-interface"
version = "0.59.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f316c4a2570ba26bbec722032c4099d8c8bc095efccdc15688708623367e358"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.106",
]
[[package]]
name = "windows-link"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "45e46c0661abb7180e7b9c281db115305d49ca1709ab8242adf09666d2173c65"
[[package]]
name = "windows-result"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7084dcc306f89883455a206237404d3eaf961e5bd7e0f312f7c91f57eb44167f"
dependencies = [
"windows-link",
]
[[package]]
name = "windows-strings"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7218c655a553b0bed4426cf54b20d7ba363ef543b52d515b3e48d7fd55318dda"
dependencies = [
"windows-link",
]
[[package]]
name = "windows-sys"
version = "0.52.0"

View File

@@ -14,11 +14,15 @@ unused_async = "deny"
unwrap_used = "deny"
[workspace.dependencies]
anyhow = "1.0.100"
akd = "0.11.0"
async-trait = "0.1.89"
akd_storage = { path = "crates/akd_storage" }
axum = { version = "0.8.8", features = ["macros"] }
bitwarden-akd-configuration = { path = "crates/bitwarden-akd-configuration" }
bitwarden-encoding = { path = "crates/bitwarden-encoding" }
blake3 = "1.8.2"
chrono = "0.4.42"
common = { path = "crates/common" }
config = "0.15.18"
hex = "0.4.3"
@@ -27,3 +31,4 @@ tokio = { version = "1.47.1", features = ["full"] }
tracing = { version = "0.1.41" }
tracing-subscriber = { version = "0.3.19" }
thiserror = "2.0.17"
uuid = { version = "1.18.1", features = ["serde"] }

View File

@@ -31,7 +31,7 @@ impl AkdDatabase {
AkdDatabase { db, vrf_key_config }
}
pub async fn vrf_key_database(&self) -> Result<VrfKeyDatabase, VrfKeyConfigError> {
pub(crate) async fn vrf_key_database(&self) -> Result<VrfKeyDatabase, VrfKeyConfigError> {
VrfKeyDatabase::new(self.db.clone(), self.vrf_key_config.clone()).await
}
}

View File

@@ -1,11 +1,11 @@
use std::time::Duration;
use akd::storage::StorageManager;
use akd::{storage::StorageManager, Directory};
use serde::Deserialize;
use thiserror::Error;
use tracing::error;
use crate::{db_config::DbConfig, vrf_key_config::VrfKeyConfig, AkdDatabase};
use crate::{db_config::DbConfig, vrf_key_config::VrfKeyConfig, AkdDatabase, VrfKeyDatabase};
#[derive(Debug, Clone, Deserialize)]
pub struct AkdStorageConfig {
@@ -22,12 +22,49 @@ pub struct AkdStorageConfig {
pub vrf_key_config: VrfKeyConfig,
}
#[derive(Debug, Error)]
#[error("Invalid AkdStorageConfig")]
pub struct AkdStorageConfigError;
#[derive(Debug, Error)]
#[error("Failed to initialize storage")]
pub struct AkdStorageInitializationError;
impl AkdStorageConfig {
pub async fn initialize_storage(
pub fn validate(&self) -> Result<(), AkdStorageConfigError> {
self.db_config
.validate()
.map_err(|_| AkdStorageConfigError)?;
Ok(())
}
pub async fn initialize_directory<TDirectoryConfig: akd::Configuration>(
&self,
) -> Result<
(
Directory<TDirectoryConfig, AkdDatabase, VrfKeyDatabase>,
AkdDatabase,
),
AkdStorageInitializationError,
> {
let (storage_manager, db) = self.initialize_storage().await?;
let vrf_storage = db.vrf_key_database().await.map_err(|err| {
error!(%err, "Failed to initialize VRF key database");
AkdStorageInitializationError
})?;
let directory = Directory::new(storage_manager, vrf_storage)
.await
.map_err(|err| {
error!(%err, "Failed to initialize Directory");
AkdStorageInitializationError
})?;
Ok((directory, db))
}
async fn initialize_storage(
&self,
) -> Result<(StorageManager<AkdDatabase>, AkdDatabase), AkdStorageInitializationError> {
let db = self.db_config.connect().await.map_err(|err| {
@@ -35,7 +72,7 @@ impl AkdStorageConfig {
AkdStorageInitializationError
})?;
let state = AkdDatabase::new(db, self.vrf_key_config.clone());
let db = AkdDatabase::new(db, self.vrf_key_config.clone());
let cache_item_lifetime = Some(Duration::from_millis(
self.cache_item_lifetime_ms.try_into().map_err(|err| {
@@ -52,12 +89,12 @@ impl AkdStorageConfig {
Ok((
StorageManager::new(
state.clone(),
db.clone(),
cache_item_lifetime,
self.cache_limit_bytes,
cache_clean_frequency,
),
state,
db,
))
}
}

View File

@@ -1,5 +1,7 @@
use akd::errors::StorageError;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tracing::error;
use crate::{ms_sql::MsSql, vrf_key_database::VrfKeyStorageError};
@@ -19,7 +21,24 @@ pub enum DatabaseType {
MsSql(MsSql),
}
#[derive(Debug, Error)]
#[error("Database configuration error")]
pub struct DbConfigError;
impl DbConfig {
pub fn validate(&self) -> Result<(), DbConfigError> {
match self {
DbConfig::MsSql {
connection_string, ..
} => {
if connection_string.is_empty() {
error!("Connection string cannot be empty");
return Err(DbConfigError);
}
}
}
Ok(())
}
pub async fn connect(&self) -> Result<DatabaseType, StorageError> {
let db = match self {
DbConfig::MsSql {

View File

@@ -6,3 +6,4 @@ pub mod vrf_key_config;
pub mod vrf_key_database;
pub use akd_database::*;
pub use vrf_key_database::*;

View File

@@ -135,7 +135,7 @@ impl VRFKeyStorage for VrfKeyDatabase {
}
}
pub struct VrfKeyTableData {
pub(crate) struct VrfKeyTableData {
pub root_key_hash: Vec<u8>,
pub root_key_type: VrfRootKeyType,
pub enc_sym_key: Option<Vec<u8>>,
@@ -144,7 +144,7 @@ pub struct VrfKeyTableData {
}
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub enum VrfRootKeyType {
pub(crate) enum VrfRootKeyType {
#[cfg(test)]
None = 0,
SymmetricKey = 1,
@@ -175,7 +175,7 @@ impl From<VrfRootKeyType> for i16 {
}
#[derive(Clone)]
pub struct VrfKey(pub Vec<u8>);
pub(crate) struct VrfKey(pub Vec<u8>);
impl VrfKeyTableData {
pub async fn new(config: &VrfKeyConfig) -> Result<(Self, VrfKey), VrfKeyCreationError> {

View File

@@ -170,33 +170,28 @@ async fn main() -> Result<()> {
key: "4AD95tg8tfveioyS/E2jAQw06FDTUCu+VSEZxa41wuM=".to_string(),
},
};
let (storage_manager, state) = config
.initialize_storage()
let (mut directory, db) = config
.initialize_directory::<TC>()
.await
.context("Failed to initialize storage")?;
.context("Failed to initialize AKD directory")?;
// Handle pre-processing modes
if let Some(()) = pre_process_mode(&args, &state.db()).await? {
if let Some(()) = pre_process_mode(&args, &db).await? {
return Ok(());
}
let vrf_key_database = state.vrf_key_database().await?;
let mut directory = Directory::<TC, _, _>::new(storage_manager, vrf_key_database)
.await
.context("Failed to create AKD directory")?;
let (tx, mut rx) = channel(2);
tokio::spawn(async move { directory_host::init_host(&mut rx, &mut directory).await });
process_mode(&args, &tx, &state).await?;
process_mode(&args, &tx, &db).await?;
Ok(())
}
// Process modes that run before creating the directory
async fn pre_process_mode(args: &CliArgs, db: &DatabaseType) -> Result<Option<()>> {
async fn pre_process_mode(args: &CliArgs, db: &AkdDatabase) -> Result<Option<()>> {
let db = db.db();
match (db, &args.mode) {
(DatabaseType::MsSql(db), Some(Mode::Drop)) => {
info!("Dropping database tables");

View File

@@ -10,6 +10,7 @@ keywords.workspace = true
akd = "0.11.0"
async-trait = { workspace = true }
akd_storage = { workspace = true }
bitwarden-akd-configuration = { workspace = true }
config = { workspace = true }
serde = { workspace = true }
thiserror.workspace = true

View File

@@ -1 +1,5 @@
use akd::Directory;
use akd_storage::{AkdDatabase, VrfKeyDatabase};
use bitwarden_akd_configuration::BitwardenV1Configuration;
pub type BitAkdDirectory = Directory<BitwardenV1Configuration, AkdDatabase, VrfKeyDatabase>;

View File

@@ -7,14 +7,22 @@ license-file.workspace = true
keywords.workspace = true
[dependencies]
anyhow = { workspace = true }
akd = { workspace = true }
akd_storage = { workspace = true }
axum = { workspace = true }
bitwarden-akd-configuration = { workspace = true }
bitwarden-encoding = { workspace = true }
chrono = { workspace = true }
common = { workspace = true }
config = { workspace = true }
serde = { workspace = true }
thiserror = { workspace = true }
tokio-util = { version = "0.7.16", features = ["compat"] }
tokio = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
uuid = { workspace = true }
async-std = { version = "1.13.2", features = ["attributes"] }
[target.'cfg(target_os = "macos")'.dependencies]

View File

@@ -0,0 +1,79 @@
use akd_storage::akd_storage_config::AkdStorageConfig;
use config::{Config, ConfigError, Environment, File};
use serde::Deserialize;
use uuid::Uuid;
const DEFAULT_EPOCH_DURATION_MS: u64 = 30000; // 30 seconds
#[derive(Clone, Debug, Deserialize)]
pub struct ApplicationConfig {
pub storage: AkdStorageConfig,
pub publisher: PublisherConfig,
pub installation_id: Uuid,
// web_server: WebServerConfig,
}
#[derive(Clone, Debug, Deserialize)]
pub struct PublisherConfig {
#[serde(default = "default_epoch_duration_ms")]
epoch_duration_ms: u64,
}
fn default_epoch_duration_ms() -> u64 {
DEFAULT_EPOCH_DURATION_MS
}
impl ApplicationConfig {
/// Load configuration from multiple sources in order of priority:
/// 1. Environment variables (prefixed with AKD_PUBLISHER) - always applied with highest priority
/// 2. Configuration file from AKD_PUBLISHER_CONFIG_PATH environment variable (if set)
/// 3. OR default configuration file (config.toml, config.yaml, config.json) in working directory
///
/// Environment variable naming:
/// - Uses double underscore (__) as separator
/// - For field `epoch_duration_ms`, use `AKD_PUBLISHER__EPOCH_DURATION_MS`
/// - For nested fields like `storage.cache_clean_ms`, use `AKD_PUBLISHER__STORAGE__CACHE_CLEAN_MS`
///
/// Note: Only one config file source is used - either custom path OR default location
pub fn load() -> Result<Self, ConfigError> {
let mut builder = Config::builder();
// Check for custom config path via environment variable
if let Ok(config_path) = std::env::var("AKD_PUBLISHER_CONFIG_PATH") {
builder = builder.add_source(File::with_name(&config_path).required(true));
} else {
// Fall back to default config file locations
builder = builder.add_source(File::with_name("config").required(false));
}
let config = builder
// Add environment variables with prefix "AKD_PUBLISHER_"
.add_source(Environment::with_prefix("AKD_PUBLISHER").separator("__"))
.build()?;
let publisher_config: Self = config.try_deserialize()?;
publisher_config.validate()?;
Ok(publisher_config)
}
pub fn validate(&self) -> Result<(), ConfigError> {
self.storage
.validate()
.map_err(|e| ConfigError::Message(format!("{e}")))?;
self.publisher.validate()?;
Ok(())
}
}
impl PublisherConfig {
pub fn validate(&self) -> Result<(), ConfigError> {
if self.epoch_duration_ms <= 0 {
return Err(ConfigError::Message(
"epoch_duration_ms must be greater than 0".to_string(),
));
}
Ok(())
}
}

View File

@@ -1,21 +1,47 @@
use akd::{directory::Directory, storage::StorageManager};
use akd_storage::DatabaseType;
use anyhow::Result;
use bitwarden_akd_configuration::BitwardenV1Configuration;
use tracing::instrument;
use tokio::sync::broadcast::Receiver;
use tracing::{info, instrument};
struct AppState {
_directory: Directory<BitwardenV1Configuration, DatabaseType, VrfStorageType>,
mod config;
mod routes;
pub use crate::config::ApplicationConfig;
pub struct AppHandles {
pub write_handle: tokio::task::JoinHandle<()>,
pub web_handle: tokio::task::JoinHandle<()>,
}
#[instrument(skip_all, name = "publisher_start")]
pub async fn start_write_job(_db: DatabaseType, vrf: VrfStorageType) {
let storage_manager = StorageManager::new_no_cache(_db);
let _app_state = AppState {
_directory: Directory::new(storage_manager, vrf).await.unwrap(),
};
println!("Publisher started");
}
pub async fn start(config: ApplicationConfig, shutdown_rx: &Receiver<()>) -> Result<AppHandles> {
let (directory, db) = config
.storage
.initialize_directory::<BitwardenV1Configuration>()
.await?;
pub async fn start_web_server(_db: DatabaseType) {
println!("Web server started");
// Initialize write job
let write_handle = {
let mut shutdown_rx = shutdown_rx.resubscribe();
tokio::spawn(async move {
// wait until shutdown signal is received
shutdown_rx.recv().await.ok();
info!("Shutting down publisher write job");
})
};
// Initialize web server
let web_handle = {
let mut shutdown_rx = shutdown_rx.resubscribe();
tokio::spawn(async move {
// wait forever until shutdown signal is received
shutdown_rx.recv().await.ok();
info!("Shutting down publisher web server");
})
};
Ok(AppHandles {
write_handle,
web_handle,
})
}

View File

@@ -2,47 +2,47 @@
//! Write permissions are needed to the underlying data stores.
//! There should only be one instance of this running at a time for a given AKD.
use akd_storage::db_config::DbConfig;
use publisher::{start_web_server, start_write_job};
use publisher::start;
use tracing::info;
use publisher::ApplicationConfig;
#[tokio::main]
async fn main() {
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::TRACE)
.init();
// Read connection string from env var
let connection_string = std::env::var("AKD_MSSQL_CONNECTION_STRING")
.expect("AKD_MSSQL_CONNECTION_STRING must be set.");
let db_config = DbConfig::MsSql {
connection_string,
pool_size: 10,
};
// Load configuration
let config = ApplicationConfig::load()
.map_err(|e| anyhow::anyhow!("Failed to load configuration: {e}"))?;
let db = db_config
.connect()
// Initialize Bitwarden AKD configuration
bitwarden_akd_configuration::BitwardenV1Configuration::init(config.installation_id);
let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel(1);
let mut handles = start(config, &shutdown_rx)
.await
.expect("Failed to connect to database");
let vrf = common::VrfStorageType::HardCodedAkdVRF;
.map_err(|e| anyhow::anyhow!("Failed to start publisher: {e}"))?;
let write_job_handle = {
let db = db.clone();
tokio::spawn(async move {
start_write_job(db, vrf).await;
})
};
let web_server_handle = tokio::spawn(async move {
start_web_server(db).await;
});
// Wait for both services to complete
// Wait for shutdown signal
tokio::select! {
_ = write_job_handle => {
info!("Write job completed");
_ = tokio::signal::ctrl_c() => {
info!("Received Ctrl+C, shutting down");
shutdown_tx.send(()).ok();
}
_ = web_server_handle => {
info!("Web service completed");
_ = &mut handles.write_handle => {
info!("Publisher service completed unexpectedly");
}
_ = &mut handles.web_handle => {
info!("Web service completed unexpectedly");
}
}
// Wait for both services to complete
handles.write_handle.await.ok();
handles.web_handle.await.ok();
Ok(())
}

View File

@@ -0,0 +1,17 @@
use axum::Json;
use serde::{Deserialize, Serialize};
use tracing::{error, info, instrument};
#[derive(Debug, Serialize, Deserialize)]
pub struct ServerHealth {
time: String,
}
#[instrument(skip_all)]
pub async fn health_handler() -> Json<ServerHealth> {
info!("Handling server health request");
let time = chrono::Utc::now().to_rfc3339();
Json(ServerHealth { time })
}

View File

@@ -0,0 +1,18 @@
use akd_storage::AkdDatabase;
use axum::routing::{get, post};
use common::BitAkdDirectory;
mod health;
mod publish;
#[derive(Clone)]
pub struct AppState {
pub directory: BitAkdDirectory,
pub db: AkdDatabase,
}
pub fn api_routes() -> axum::Router<AppState> {
axum::Router::new()
.route("/health", get(health::health_handler))
.route("/publish", post(publish::publish_handler))
}

View File

@@ -0,0 +1,30 @@
use super::AppState;
use axum::{extract::State, http::StatusCode, response::IntoResponse, Json};
use serde::{Deserialize, Serialize};
use tracing::{error, info, instrument};
#[derive(Debug, Serialize, Deserialize)]
pub struct PublishRequest {
pub akd_label_b64: bitwarden_encoding::B64,
pub akd_value_b64: bitwarden_encoding::B64,
}
#[derive(Debug, Serialize)]
pub struct PublishResponse {
pub success: bool,
}
#[instrument(skip_all)]
pub async fn publish_handler(
State(AppState { directory, .. }): State<AppState>,
Json(request): Json<PublishRequest>,
) -> impl IntoResponse {
info!("Handling publish request");
let akd_label: Vec<u8> = request.akd_label_b64.into_bytes();
let akd_value: Vec<u8> = request.akd_value_b64.into_bytes();
//TODO: enqueue publish operation to to_publish queue
Json(PublishResponse { success: true })
}

View File

@@ -1,15 +1,15 @@
use akd::{directory::ReadOnlyDirectory, storage::StorageManager};
use akd_storage::DatabaseType;
use akd_storage::{AkdDatabase, VrfKeyDatabase};
use bitwarden_akd_configuration::BitwardenV1Configuration;
use tracing::instrument;
struct AppState {
// Add any shared state here, e.g., database connections
_directory: ReadOnlyDirectory<BitwardenV1Configuration, DatabaseType, VrfStorageType>,
_directory: ReadOnlyDirectory<BitwardenV1Configuration, AkdDatabase, VrfKeyDatabase>,
}
#[instrument(skip_all, name = "reader_start")]
pub async fn start(db: DatabaseType) {
pub async fn start(db: AkdDatabase, vrf: VrfKeyDatabase) {
let storage_manager = StorageManager::new_no_cache(db);
let _app = AppState {
_directory: ReadOnlyDirectory::new(storage_manager, vrf)