From 82776f78a6f9f10c47fd3e57c14fbcd7bcb919a3 Mon Sep 17 00:00:00 2001 From: Matt Gibson Date: Thu, 22 Jan 2026 16:08:52 -0800 Subject: [PATCH] Provide next epoch hints on reader health endpoint --- akd/crates/aio/src/config.rs | 2 +- akd/crates/reader/src/config.rs | 11 +++ akd/crates/reader/src/epoch_tracker.rs | 126 +++++++++++++++++++++++++ akd/crates/reader/src/lib.rs | 55 ++++++++++- akd/crates/reader/src/routes/health.rs | 30 +++++- 5 files changed, 216 insertions(+), 8 deletions(-) create mode 100644 akd/crates/reader/src/epoch_tracker.rs diff --git a/akd/crates/aio/src/config.rs b/akd/crates/aio/src/config.rs index bf56e3af7f..63c0f55619 100644 --- a/akd/crates/aio/src/config.rs +++ b/akd/crates/aio/src/config.rs @@ -139,7 +139,6 @@ impl ApplicationConfig { self.reader.validate()?; Ok(()) } - } impl PublisherSettings { @@ -197,6 +196,7 @@ impl From<&ApplicationConfig> for reader::ApplicationConfig { installation_id: config.installation_id, max_batch_lookup_size: config.reader.max_batch_lookup_size, azks_poll_interval_ms: config.reader.azks_poll_interval_ms, + expected_epoch_duration_ms: config.publisher.epoch_duration_ms, } } } diff --git a/akd/crates/reader/src/config.rs b/akd/crates/reader/src/config.rs index bf13c640a2..9ee7e23b99 100644 --- a/akd/crates/reader/src/config.rs +++ b/akd/crates/reader/src/config.rs @@ -18,6 +18,10 @@ pub struct ApplicationConfig { /// Optional polling interval for AZKS storage in milliseconds Should be significantly less than the epoch interval. Defaults to 100 ms. #[serde(default = "default_azks_poll_interval_ms")] pub azks_poll_interval_ms: u64, + /// Expected duration between epoch publishes in milliseconds. + /// This value should match the publisher's epoch_duration_ms configuration. + /// Used to predict when the next epoch will be published. + pub expected_epoch_duration_ms: u64, } fn default_web_server_bind_address() -> String { @@ -71,6 +75,13 @@ impl ApplicationConfig { self.storage .validate() .map_err(|e| ConfigError::Message(format!("{e}")))?; + + if self.expected_epoch_duration_ms == 0 { + return Err(ConfigError::Message( + "expected_epoch_duration_ms must be greater than 0".to_string(), + )); + } + Ok(()) } diff --git a/akd/crates/reader/src/epoch_tracker.rs b/akd/crates/reader/src/epoch_tracker.rs new file mode 100644 index 0000000000..83eac1f1f7 --- /dev/null +++ b/akd/crates/reader/src/epoch_tracker.rs @@ -0,0 +1,126 @@ +use chrono::{DateTime, Utc}; +use std::sync::Arc; +use tokio::sync::RwLock; + +/// Tracks epoch publishes and provides prediction based on expected duration +#[derive(Debug, Clone)] +pub(crate) struct EpochTracker { + inner: Arc>, +} + +#[derive(Debug)] +struct EpochTrackerInner { + last_publish_time: Option>, + expected_epoch_duration_ms: u64, +} + +impl EpochTracker { + pub(crate) fn new(expected_epoch_duration_ms: u64) -> Self { + Self { + inner: Arc::new(RwLock::new(EpochTrackerInner { + last_publish_time: None, + expected_epoch_duration_ms, + })), + } + } + + /// Record a new epoch publish + pub(crate) async fn record_publish(&self, published_at: DateTime) { + let mut inner = self.inner.write().await; + inner.last_publish_time = Some(published_at); + } + + /// Predict the next epoch publish time using modulus calculation + /// Returns (seconds_until_next, next_epoch_datetime) or None if no publish has been recorded yet + pub(crate) async fn predict_next_epoch( + &self, + now: DateTime, + ) -> Option<(f64, DateTime)> { + let inner = self.inner.read().await; + let last_publish = inner.last_publish_time?; + + // Calculate time since last publish + let duration_since_publish = now - last_publish; + let ms_since_publish = duration_since_publish.num_milliseconds(); + + // Use modulus to find time until next epoch + let ms_until_next = inner.expected_epoch_duration_ms as i64 + - (ms_since_publish % inner.expected_epoch_duration_ms as i64); + + // Calculate predicted next epoch time + let next_epoch_time = now + chrono::Duration::milliseconds(ms_until_next); + + // Convert to seconds with tenths precision + let seconds_until = ms_until_next as f64 / 1000.0; + let rounded = (seconds_until * 10.0).ceil() / 10.0; + + Some((rounded, next_epoch_time)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_tracker_no_prediction_before_publish() { + let tracker = EpochTracker::new(30000); + let now = Utc::now(); + assert!(tracker.predict_next_epoch(now).await.is_none()); + } + + #[tokio::test] + async fn test_tracker_predicts_after_publish() { + let tracker = EpochTracker::new(30000); // 30 second epochs + let now = Utc::now(); + let publish_time = now - chrono::Duration::seconds(10); // 10 seconds ago + + tracker.record_publish(publish_time).await; + + let prediction = tracker.predict_next_epoch(now).await; + assert!(prediction.is_some()); + + let (seconds_until, next_time) = prediction.unwrap(); + + // Should predict ~20 seconds until next (30 - 10) + assert!((seconds_until - 20.0).abs() < 0.2); + + // Next epoch should be approximately 20 seconds from now + let expected = now + chrono::Duration::seconds(20); + let diff = (next_time - expected).num_seconds().abs(); + assert!(diff < 1); + } + + #[tokio::test] + async fn test_tracker_handles_epoch_skip() { + let tracker = EpochTracker::new(30000); // 30 second epochs + let now = Utc::now(); + let publish_time = now - chrono::Duration::seconds(75); // 75 seconds ago (2.5 epochs) + + tracker.record_publish(publish_time).await; + + let prediction = tracker.predict_next_epoch(now).await; + assert!(prediction.is_some()); + + let (seconds_until, _) = prediction.unwrap(); + + // Should predict ~15 seconds until next (75 % 30 = 15, 30 - 15 = 15) + assert!((seconds_until - 15.0).abs() < 0.2); + } + + #[tokio::test] + async fn test_tracker_updates_publish() { + let tracker = EpochTracker::new(30000); + let now = Utc::now(); + let t1 = now - chrono::Duration::seconds(60); + let t2 = now - chrono::Duration::seconds(10); + + tracker.record_publish(t1).await; + tracker.record_publish(t2).await; + + let (seconds_until, _) = tracker.predict_next_epoch(now).await.unwrap(); + + // Should use the newer publish time (t2) + assert!((seconds_until - 20.0).abs() < 0.2); + } +} diff --git a/akd/crates/reader/src/lib.rs b/akd/crates/reader/src/lib.rs index 5b314f8088..b1f5ca37c3 100644 --- a/akd/crates/reader/src/lib.rs +++ b/akd/crates/reader/src/lib.rs @@ -7,9 +7,12 @@ use tokio::{net::TcpListener, sync::broadcast::Receiver}; use tracing::{info, instrument}; mod config; +mod epoch_tracker; pub mod error; mod routes; +use epoch_tracker::EpochTracker; + pub use crate::config::ApplicationConfig; pub use error::{ErrorCode, ErrorResponse, ReaderError}; pub use routes::response_types; @@ -21,6 +24,7 @@ struct AppState { // TODO: use this to allow for unique failures for lookup and key history requests that have pending updates // publish_queue: ReadOnlyPublishQueueType, max_batch_lookup_size: usize, + epoch_tracker: EpochTracker, } #[instrument(skip_all, name = "reader_start")] @@ -37,11 +41,13 @@ pub async fn start( let mut shutdown_rx = shutdown_rx.resubscribe(); let max_batch_lookup_size = config.max_batch_lookup_size; + let epoch_tracker = EpochTracker::new(config.expected_epoch_duration_ms); let axum_handle = tokio::spawn(async move { let app_state = AppState { directory: directory.clone(), // publish_queue: publish_queue, max_batch_lookup_size, + epoch_tracker: epoch_tracker.clone(), }; let app = Router::new() @@ -58,16 +64,61 @@ pub async fn start( ); // polls azks storage for epoch changes. This is necessary to pick up newly published updates. + let epoch_tracker_for_poll = epoch_tracker.clone(); + let directory_for_poll = directory.clone(); + let poll_interval = config.azks_poll_interval_ms; + let _poll_handle = tokio::spawn(async move { + let (change_tx, mut change_rx) = tokio::sync::mpsc::channel::<()>(100); + + // Detector task: listens for changes and records to tracker + let detector_handle = tokio::spawn(async move { + let mut last_epoch = match directory_for_poll.get_epoch_hash().await { + Ok(epoch_hash) => { + tracing::info!(epoch = epoch_hash.0, "Initial epoch detected"); + epoch_hash.0 + } + Err(e) => { + tracing::error!("Failed to get initial epoch: {:?}", e); + 0 + } + }; + + while change_rx.recv().await.is_some() { + match directory_for_poll.get_epoch_hash().await { + Ok(epoch_hash) => { + let current_epoch = epoch_hash.0; + if current_epoch != last_epoch { + let published_at = chrono::Utc::now(); + tracing::info!( + previous_epoch = last_epoch, + new_epoch = current_epoch, + published_at = %published_at.to_rfc3339(), + "Epoch publish detected" + ); + epoch_tracker_for_poll.record_publish(published_at).await; + last_epoch = current_epoch; + } + } + Err(e) => { + tracing::error!("Failed to get epoch hash: {:?}", e); + } + } + } + }); + let result = directory .poll_for_azks_changes( - tokio::time::Duration::from_millis(config.azks_poll_interval_ms), - None, + tokio::time::Duration::from_millis(poll_interval), + Some(change_tx), ) .await; + if let Err(e) = result { tracing::error!("Error polling for AZKS changes: {:?}", e); } + + detector_handle.abort(); }); axum::serve(listener, app.into_make_service()) diff --git a/akd/crates/reader/src/routes/health.rs b/akd/crates/reader/src/routes/health.rs index b6c9686e63..7a097223af 100644 --- a/akd/crates/reader/src/routes/health.rs +++ b/akd/crates/reader/src/routes/health.rs @@ -1,19 +1,39 @@ -use axum::{http::StatusCode, Json}; +use axum::{extract::State, http::StatusCode, Json}; use serde::{Deserialize, Serialize}; use tracing::{info, instrument}; -use crate::routes::Response; +use crate::{routes::Response, AppState}; #[derive(Debug, Serialize, Deserialize)] pub struct HealthData { time: String, + #[serde(skip_serializing_if = "Option::is_none")] + predicted_next_epoch_datetime: Option, + #[serde(skip_serializing_if = "Option::is_none")] + predicted_seconds_until_next_epoch: Option, } #[instrument(skip_all)] -pub async fn health_handler() -> (StatusCode, Json>) { +pub async fn health_handler( + State(AppState { epoch_tracker, .. }): State, +) -> (StatusCode, Json>) { info!("Handling server health request"); - let time = chrono::Utc::now().to_rfc3339(); + let now = chrono::Utc::now(); + let time = now.to_rfc3339(); - (StatusCode::OK, Json(Response::success(HealthData { time }))) + let (predicted_seconds_until_next_epoch, predicted_next_epoch_datetime) = epoch_tracker + .predict_next_epoch(now) + .await + .map(|(seconds, datetime)| (Some(seconds), Some(datetime.to_rfc3339()))) + .unwrap_or((None, None)); + + ( + StatusCode::OK, + Json(Response::success(HealthData { + time, + predicted_next_epoch_datetime, + predicted_seconds_until_next_epoch, + })), + ) }