mirror of
https://github.com/bitwarden/server
synced 2026-01-29 15:53:36 +00:00
create readonly initialization for directory storages
This commit is contained in:
@@ -1,6 +1,6 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use akd::{storage::StorageManager, Directory};
|
||||
use akd::{directory::ReadOnlyDirectory, storage::StorageManager, Directory};
|
||||
use serde::Deserialize;
|
||||
use thiserror::Error;
|
||||
use tracing::error;
|
||||
@@ -71,6 +71,35 @@ impl AkdStorageConfig {
|
||||
Ok((directory, db, publish_queue))
|
||||
}
|
||||
|
||||
pub async fn initialize_readonly_directory<TDirectoryConfig: akd::Configuration>(
|
||||
&self,
|
||||
) -> Result<
|
||||
(
|
||||
ReadOnlyDirectory<TDirectoryConfig, AkdDatabase, VrfKeyDatabase>,
|
||||
AkdDatabase,
|
||||
PublishQueueType,
|
||||
),
|
||||
AkdStorageInitializationError,
|
||||
> {
|
||||
let (storage_manager, db) = self.initialize_storage().await?;
|
||||
|
||||
let vrf_storage = db.vrf_key_database().await.map_err(|err| {
|
||||
error!(%err, "Failed to initialize VRF key database");
|
||||
AkdStorageInitializationError
|
||||
})?;
|
||||
|
||||
let publish_queue = PublishQueueType::new(&self.publish_queue_config, &db);
|
||||
|
||||
let directory = ReadOnlyDirectory::new(storage_manager, vrf_storage)
|
||||
.await
|
||||
.map_err(|err| {
|
||||
error!(%err, "Failed to initialize ReadOnlyDirectory");
|
||||
AkdStorageInitializationError
|
||||
})?;
|
||||
|
||||
Ok((directory, db, publish_queue))
|
||||
}
|
||||
|
||||
async fn initialize_storage(
|
||||
&self,
|
||||
) -> Result<(StorageManager<AkdDatabase>, AkdDatabase), AkdStorageInitializationError> {
|
||||
|
||||
@@ -30,14 +30,15 @@ use uuid::Uuid;
|
||||
use crate::{
|
||||
ms_sql::tables::{
|
||||
publish_queue::{
|
||||
bulk_delete_rows, bulk_delete_statement, enqueue_statement, peek_no_limit_statement,
|
||||
peek_statement,
|
||||
bulk_delete_rows, bulk_delete_statement, enqueue_statement,
|
||||
label_pending_publish_statement, peek_no_limit_statement, peek_statement,
|
||||
},
|
||||
vrf_key,
|
||||
},
|
||||
publish_queue::{PublishQueue, PublishQueueError},
|
||||
vrf_key_config::VrfKeyConfig,
|
||||
vrf_key_database::{VrfKeyRetrievalError, VrfKeyStorageError, VrfKeyTableData},
|
||||
ReadOnlyPublishQueue,
|
||||
};
|
||||
|
||||
const DEFAULT_POOL_SIZE: u32 = 100;
|
||||
@@ -908,3 +909,44 @@ impl PublishQueue for MsSql {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ReadOnlyPublishQueue for MsSql {
|
||||
#[instrument(skip(self), level = "debug")]
|
||||
async fn label_pending_publish(&self, label: &AkdLabel) -> Result<bool, PublishQueueError> {
|
||||
debug!("Checking if label is pending publish");
|
||||
|
||||
let mut conn = self.get_connection().await.map_err(|_| {
|
||||
error!("Failed to get DB connection for label_pending_publish");
|
||||
PublishQueueError
|
||||
})?;
|
||||
|
||||
let statement = label_pending_publish_statement(label);
|
||||
trace!(sql = statement.sql(), "Query SQL");
|
||||
|
||||
let query_stream = conn
|
||||
.query(statement.sql(), &statement.params())
|
||||
.await
|
||||
.map_err(|e| {
|
||||
error!(error = %e, "Failed to execute label pending publish query");
|
||||
PublishQueueError
|
||||
})?;
|
||||
|
||||
let row = query_stream.into_row().await.map_err(|e| {
|
||||
error!(error = %e, "Failed to fetch row for label pending publish");
|
||||
PublishQueueError
|
||||
})?;
|
||||
|
||||
if let Some(row) = row {
|
||||
let is_pending = statement.parse(&row).map_err(|e| {
|
||||
error!(error = %e, "Failed to parse label pending publish result");
|
||||
PublishQueueError
|
||||
})?;
|
||||
debug!(is_pending, "Label pending publish check complete");
|
||||
Ok(is_pending)
|
||||
} else {
|
||||
debug!("Label not found in publish queue");
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -110,23 +110,28 @@ pub fn bulk_delete_statement(temp_table_name: &str) -> Statement {
|
||||
Statement::new(sql, SqlParams::new())
|
||||
}
|
||||
|
||||
// pub fn delete_statement(ids: Vec<uuid::Uuid>) -> Statement {
|
||||
// debug!("Building delete_statement for publish queue");
|
||||
// let mut params = SqlParams::new();
|
||||
// let mut id_placeholders = Vec::new();
|
||||
pub fn label_pending_publish_statement(
|
||||
label: &AkdLabel,
|
||||
) -> QueryStatement<bool, PublishQueueError> {
|
||||
debug!("Building label_pending_publish_statement for publish queue");
|
||||
let mut params = SqlParams::new();
|
||||
params.add("raw_label", Box::new(label.0.clone()));
|
||||
|
||||
// for (i, id) in ids.iter().enumerate() {
|
||||
// let param_name = format!("id_{}", i);
|
||||
// params.add(¶m_name, Box::new(*id));
|
||||
// id_placeholders.push(params.key_for(¶m_name).expect("id was added to params"));
|
||||
// }
|
||||
|
||||
// let sql = format!(
|
||||
// r#"
|
||||
// DELETE FROM {}
|
||||
// WHERE id IN ({})"#,
|
||||
// TABLE_PUBLISH_QUEUE,
|
||||
// id_placeholders.join(", ")
|
||||
// );
|
||||
// Statement::new(sql, params)
|
||||
// }
|
||||
let sql = format!(
|
||||
r#"
|
||||
SELECT COUNT(1) AS label_count
|
||||
FROM {}
|
||||
WHERE raw_label = {}"#,
|
||||
TABLE_PUBLISH_QUEUE,
|
||||
params
|
||||
.key_for("raw_label")
|
||||
.expect("raw_label was added to the params list"),
|
||||
);
|
||||
QueryStatement::new(sql, params, |row: &ms_database::Row| {
|
||||
let count: i64 = row.get("label_count").ok_or_else(|| {
|
||||
error!("label_count is NULL or missing in publish queue row");
|
||||
PublishQueueError
|
||||
})?;
|
||||
Ok(count > 0)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -26,6 +26,11 @@ pub trait PublishQueue {
|
||||
async fn remove(&self, ids: Vec<uuid::Uuid>) -> Result<(), PublishQueueError>;
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait ReadOnlyPublishQueue {
|
||||
async fn label_pending_publish(&self, label: &AkdLabel) -> Result<bool, PublishQueueError>;
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum PublishQueueType {
|
||||
MsSql(MsSql),
|
||||
@@ -74,3 +79,12 @@ impl PublishQueue for PublishQueueType {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ReadOnlyPublishQueue for PublishQueueType {
|
||||
async fn label_pending_publish(&self, label: &AkdLabel) -> Result<bool, PublishQueueError> {
|
||||
match self {
|
||||
PublishQueueType::MsSql(ms_sql) => ms_sql.label_pending_publish(label).await,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user