diff --git a/akd/crates/reader/src/config.rs b/akd/crates/reader/src/config.rs index ee43275a1b..54e30095ee 100644 --- a/akd/crates/reader/src/config.rs +++ b/akd/crates/reader/src/config.rs @@ -15,6 +15,9 @@ pub struct ApplicationConfig { /// Maximum number of labels allowed in a single batch lookup request. Defaults to 10. #[serde(default = "default_max_batch_lookup_size")] pub max_batch_lookup_size: usize, + /// Optional polling interval for AZKS storage in milliseconds Should be significantly less than the epoch interval. Defaults to 100 ms. + #[serde(default = "default_azks_poll_interval_ms")] + pub azks_poll_interval_ms: u64, } fn default_web_server_bind_address() -> String { @@ -25,6 +28,10 @@ fn default_max_batch_lookup_size() -> usize { 10 } +fn default_azks_poll_interval_ms() -> u64 { + 100 +} + impl ApplicationConfig { /// Load configuration from multiple sources in order of priority: /// 1. Environment variables (prefixed with AKD_READER) - always applied with highest priority diff --git a/akd/crates/reader/src/lib.rs b/akd/crates/reader/src/lib.rs index bbd0dcf6ed..d414d962d2 100644 --- a/akd/crates/reader/src/lib.rs +++ b/akd/crates/reader/src/lib.rs @@ -37,9 +37,9 @@ pub async fn start( let mut shutdown_rx = shutdown_rx.resubscribe(); let max_batch_lookup_size = config.max_batch_lookup_size; - let handle = tokio::spawn(async move { + let axum_handle = tokio::spawn(async move { let app_state = AppState { - directory: directory, + directory: directory.clone(), // publish_queue: publish_queue, max_batch_lookup_size, }; @@ -56,6 +56,19 @@ pub async fn start( "Reader web server listening" ); + // polls azks storage for epoch changes. This is necessary to pick up newly published updates. + let _poll_handle = tokio::spawn(async move { + let result = directory + .poll_for_azks_changes( + tokio::time::Duration::from_millis(config.azks_poll_interval_ms), + None, + ) + .await; + if let Err(e) = result { + tracing::error!("Error polling for AZKS changes: {:?}", e); + } + }); + axum::serve(listener, app.into_make_service()) .with_graceful_shutdown(async move { shutdown_rx.recv().await.ok(); @@ -66,5 +79,5 @@ pub async fn start( Ok(()) }); - Ok(handle) + Ok(axum_handle) }