1
0
mirror of https://github.com/bitwarden/server synced 2026-01-29 07:43:22 +00:00

create a publish queue as part of config initialization

This commit is contained in:
Matt Gibson
2026-01-13 17:05:09 -08:00
parent 2ad61ff10a
commit 2c907f14ab
7 changed files with 60 additions and 6 deletions

View File

@@ -5,7 +5,10 @@ use serde::Deserialize;
use thiserror::Error;
use tracing::error;
use crate::{db_config::DbConfig, vrf_key_config::VrfKeyConfig, AkdDatabase, VrfKeyDatabase};
use crate::{
db_config::DbConfig, publish_queue_config::PublishQueueConfig, vrf_key_config::VrfKeyConfig,
AkdDatabase, PublishQueueType, VrfKeyDatabase,
};
#[derive(Debug, Clone, Deserialize)]
pub struct AkdStorageConfig {
@@ -20,6 +23,7 @@ pub struct AkdStorageConfig {
#[serde(default = "default_cache_clean_ms")]
pub cache_clean_ms: usize,
pub vrf_key_config: VrfKeyConfig,
pub publish_queue_config: PublishQueueConfig,
}
#[derive(Debug, Error)]
@@ -44,6 +48,7 @@ impl AkdStorageConfig {
(
Directory<TDirectoryConfig, AkdDatabase, VrfKeyDatabase>,
AkdDatabase,
PublishQueueType,
),
AkdStorageInitializationError,
> {
@@ -54,6 +59,8 @@ impl AkdStorageConfig {
AkdStorageInitializationError
})?;
let publish_queue = PublishQueueType::new(&self.publish_queue_config, &db);
let directory = Directory::new(storage_manager, vrf_storage)
.await
.map_err(|err| {
@@ -61,7 +68,7 @@ impl AkdStorageConfig {
AkdStorageInitializationError
})?;
Ok((directory, db))
Ok((directory, db, publish_queue))
}
async fn initialize_storage(

View File

@@ -3,6 +3,7 @@ pub mod akd_storage_config;
pub mod db_config;
pub mod ms_sql;
mod publish_queue;
pub mod publish_queue_config;
pub mod vrf_key_config;
pub mod vrf_key_database;

View File

@@ -1,7 +1,12 @@
use async_trait::async_trait;
use thiserror::Error;
use crate::ms_sql::MsSql;
use crate::{
db_config::DatabaseType,
ms_sql::MsSql,
publish_queue_config::{PublishQueueConfig, PublishQueueProvider},
AkdDatabase,
};
pub(crate) struct PublishQueueItem {
pub id: uuid::Uuid,
@@ -29,6 +34,22 @@ pub enum PublishQueueType {
MsSql(MsSql),
}
impl PublishQueueType {
pub fn new(config: &PublishQueueConfig, db: &AkdDatabase) -> PublishQueueType {
match &config.provider {
PublishQueueProvider::DbBacked => db.into(),
}
}
}
impl From<&AkdDatabase> for PublishQueueType {
fn from(db: &AkdDatabase) -> Self {
match db.db() {
DatabaseType::MsSql(ms_sql) => PublishQueueType::MsSql(ms_sql.clone()),
}
}
}
#[async_trait]
impl PublishQueue for PublishQueueType {
async fn enqueue(

View File

@@ -0,0 +1,21 @@
use serde::Deserialize;
#[derive(Debug, Clone, Deserialize)]
#[serde(tag = "type")]
pub struct PublishQueueConfig {
pub provider: PublishQueueProvider,
#[serde(default = "default_publish_limit")]
pub epoch_update_limit: Option<isize>,
}
fn default_publish_limit() -> Option<isize> {
None
}
#[derive(Debug, Clone, Deserialize)]
#[serde(tag = "type")]
pub enum PublishQueueProvider {
DbBacked,
}
impl PublishQueueConfig {}

View File

@@ -169,8 +169,12 @@ async fn main() -> Result<()> {
vrf_key_config: akd_storage::vrf_key_config::VrfKeyConfig::B64EncodedSymmetricKey {
key: "4AD95tg8tfveioyS/E2jAQw06FDTUCu+VSEZxa41wuM=".to_string(),
},
publish_queue_config: akd_storage::publish_queue_config::PublishQueueConfig {
provider: akd_storage::publish_queue_config::PublishQueueProvider::DbBacked,
epoch_update_limit: None,
},
};
let (mut directory, db) = config
let (mut directory, db, _) = config
.initialize_directory::<TC>()
.await
.context("Failed to initialize AKD directory")?;

View File

@@ -15,7 +15,7 @@ pub struct AppHandles {
#[instrument(skip_all, name = "publisher_start")]
pub async fn start(config: ApplicationConfig, shutdown_rx: &Receiver<()>) -> Result<AppHandles> {
let (directory, db) = config
let (directory, db, publish_queue) = config
.storage
.initialize_directory::<BitwardenV1Configuration>()
.await?;

View File

@@ -6,7 +6,7 @@ mod health;
mod publish;
#[derive(Clone)]
pub struct AppState {
pub(crate) struct AppState {
pub directory: BitAkdDirectory,
pub db: AkdDatabase,
pub publish_queue: PublishQueueType,