diff --git a/CHANGELOG.md b/CHANGELOG.md index eb52f0697..bdfbd61a1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,22 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### Added +- **`RollingP95` adaptive feature normalizer** (`v2/crates/wifi-densepose-sensing-server`) — + Streaming P95 estimator (600-sample / ~30 s sliding window) that self-calibrates + feature normalization to whatever distribution the deployment produces. Replaces + fixed-scale denominators (`variance/300`, `motion/250`, `spectral/500`) which saturated + when live ESP32 values exceeded those limits, collapsing dynamic range to zero. + Cold-start (<60 samples) falls back to the legacy denominators so day-0 behaviour + is preserved. Deployment-neutral: no hardcoded values. (ADR-044 §5.2) + +- **`dedup_factor` runtime configuration API** (`v2/crates/wifi-densepose-sensing-server`) — + Exposes the multi-node person-count deduplication divisor at runtime via REST: + - `GET /api/v1/config/dedup-factor` — read current value. + - `POST /api/v1/config/dedup-factor` — set value (clamped 1.0–10.0, persisted). + - `POST /api/v1/config/ground-truth` — auto-tunes `dedup_factor` from a known + person count (`{"count": N}`); derives optimal divisor from current node-sum. + Config is persisted to `data/config.json` and reloaded on restart. (ADR-044 §5.3) + - **`nvsim` crate — deterministic NV-diamond magnetometer pipeline simulator** (ADR-089) — New standalone leaf crate at `v2/crates/nvsim` modeling a forward-only magnetic sensing path: scene → source synthesis (Biot–Savart, dipole, diff --git a/v2/crates/wifi-densepose-sensing-server/src/main.rs b/v2/crates/wifi-densepose-sensing-server/src/main.rs index a8b207e47..328a54b6b 100644 --- a/v2/crates/wifi-densepose-sensing-server/src/main.rs +++ b/v2/crates/wifi-densepose-sensing-server/src/main.rs @@ -540,6 +540,93 @@ fn build_node_features( Some(entries) } +// ── ADR-044 §5.2: Rolling P95 adaptive feature normalizer ──────────────────── + +/// Streaming P95 estimator over a fixed-size sliding window. +/// +/// Self-calibrates feature normalization to whatever distribution the deployment +/// produces — no hardcoded scale values that can saturate in large rooms or +/// degrade in high-interference environments. +/// +/// O(n log n) per query via sorted copy — acceptable at 20 Hz with window=600. +/// Cold-start (len < min_samples) returns `None` so the caller uses the legacy +/// fixed denominator, preserving day-0 behaviour. +pub struct RollingP95 { + buf: std::collections::VecDeque, + window: usize, + min_samples: usize, +} + +impl RollingP95 { + pub fn new(window: usize, min_samples: usize) -> Self { + Self { + buf: std::collections::VecDeque::with_capacity(window), + window, + min_samples, + } + } + + pub fn push(&mut self, v: f64) { + if self.buf.len() == self.window { + self.buf.pop_front(); + } + self.buf.push_back(v); + } + + /// Returns `Some(p95)` once enough samples have accumulated, else `None`. + pub fn current(&self) -> Option { + if self.buf.len() < self.min_samples { + return None; + } + let mut sorted: Vec = self.buf.iter().copied().collect(); + sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal)); + let idx = ((sorted.len() as f64) * 0.95).ceil() as usize; + Some(sorted[idx.saturating_sub(1).min(sorted.len() - 1)]) + } + + #[allow(dead_code)] + pub fn len(&self) -> usize { + self.buf.len() + } +} + +// ── ADR-044 §5.3: Runtime config persistence ───────────────────────────────── + +/// Runtime configuration that persists across server restarts via `data/config.json`. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub(crate) struct RuntimeConfig { + /// Divisor for multi-node person-count deduplication (sum / factor). + pub dedup_factor: f64, +} + +impl Default for RuntimeConfig { + fn default() -> Self { + Self { dedup_factor: 3.0 } + } +} + +/// Load persisted runtime config from `/config.json`. +/// Falls back to [`RuntimeConfig::default`] if the file is absent or malformed. +pub(crate) fn load_runtime_config(data_dir: &std::path::Path) -> RuntimeConfig { + let path = data_dir.join("config.json"); + match std::fs::read_to_string(&path) { + Ok(json) => serde_json::from_str(&json).unwrap_or_default(), + Err(_) => RuntimeConfig::default(), + } +} + +/// Persist runtime config to `/config.json`. +pub(crate) fn save_runtime_config(data_dir: &std::path::Path, config: &RuntimeConfig) { + let path = data_dir.join("config.json"); + if let Ok(json) = serde_json::to_string_pretty(config) { + if let Err(e) = std::fs::write(&path, json) { + warn!("Failed to save runtime config to {}: {e}", path.display()); + } else { + info!("Runtime config saved to {}", path.display()); + } + } +} + /// Shared application state struct AppStateInner { latest_update: Option, @@ -642,6 +729,21 @@ struct AppStateInner { multistatic_fuser: MultistaticFuser, /// SVD-based room field model for eigenvalue person counting (None until calibration). field_model: Option, + // ── ADR-044 §5.2: adaptive rolling-p95 normalization ───────────────────── + /// Rolling P95 of `FeatureInfo.variance` over the last ~30 s (600 frames @ 20 Hz). + pub(crate) p95_variance: RollingP95, + /// Rolling P95 of `FeatureInfo.motion_band_power` over the last ~30 s. + pub(crate) p95_motion_band_power: RollingP95, + /// Rolling P95 of `FeatureInfo.spectral_power` over the last ~30 s. + pub(crate) p95_spectral_power: RollingP95, + // ── ADR-044 §5.3: runtime-configurable dedup factor ─────────────────────── + /// Divisor for multi-node person-count deduplication (sum / factor). + /// Default 3.0 (one body visible to ~3 nodes on average). + /// Configurable at runtime via `POST /api/v1/config/dedup-factor` and + /// `POST /api/v1/config/ground-truth`. Persisted across restarts. + pub(crate) dedup_factor: f64, + /// Data directory for persisting runtime config (parent of `firmware_dir`). + pub(crate) data_dir: std::path::PathBuf, } /// If no ESP32 frame arrives within this duration, source reverts to offline. @@ -1728,8 +1830,13 @@ async fn windows_wifi_task(state: SharedState, tick_ms: u64) { let feat_variance = features.variance; + // ADR-044 §5.2: feed raw features into rolling-P95 estimators before scoring. + s.p95_variance.push(features.variance); + s.p95_motion_band_power.push(features.motion_band_power); + s.p95_spectral_power.push(features.spectral_power); + // Multi-person estimation with temporal smoothing (EMA α=0.10). - let raw_score = compute_person_score(&features); + let raw_score = compute_person_score(&*s, &features); s.smoothed_person_score = s.smoothed_person_score * 0.90 + raw_score * 0.10; let est_persons = if classification.presence { let count = s.person_count(); @@ -1867,8 +1974,13 @@ async fn windows_wifi_fallback_tick(state: &SharedState, seq: u32) { let feat_variance = features.variance; + // ADR-044 §5.2: feed raw features into rolling-P95 estimators before scoring. + s.p95_variance.push(features.variance); + s.p95_motion_band_power.push(features.motion_band_power); + s.p95_spectral_power.push(features.spectral_power); + // Multi-person estimation with temporal smoothing (EMA α=0.10). - let raw_score = compute_person_score(&features); + let raw_score = compute_person_score(&*s, &features); s.smoothed_person_score = s.smoothed_person_score * 0.90 + raw_score * 0.10; let est_persons = if classification.presence { let count = s.person_count(); @@ -2256,13 +2368,19 @@ fn fuse_multi_node_features( /// /// Returns a raw score (0.0..1.0) that the caller converts to person count /// after temporal smoothing. -fn compute_person_score(feat: &FeatureInfo) -> f64 { - // Normalize each feature to [0, 1] using ranges calibrated from real - // ESP32 hardware (COM6/COM9 on ruv.net, March 2026). - let var_norm = (feat.variance / 300.0).clamp(0.0, 1.0); +fn compute_person_score(state: &AppStateInner, feat: &FeatureInfo) -> f64 { + // ADR-044 §5.2: adaptive rolling-P95 normalization. + // Legacy fixed denominators (variance/300, motion/250, spectral/500) saturate + // when live ESP32 values exceed those limits — zero dynamic range results. + // Use the P95 of the last ~30 s of history instead, falling back to the legacy + // denominators during cold-start (<60 samples) to preserve day-0 behaviour. + let var_denom = state.p95_variance.current().map(|p| p.max(50.0)).unwrap_or(300.0); + let motion_denom = state.p95_motion_band_power.current().map(|p| p.max(50.0)).unwrap_or(250.0); + let sp_denom = state.p95_spectral_power.current().map(|p| p.max(100.0)).unwrap_or(500.0); + let var_norm = (feat.variance / var_denom).clamp(0.0, 1.0); let cp_norm = (feat.change_points as f64 / 30.0).clamp(0.0, 1.0); - let motion_norm = (feat.motion_band_power / 250.0).clamp(0.0, 1.0); - let sp_norm = (feat.spectral_power / 500.0).clamp(0.0, 1.0); + let motion_norm = (feat.motion_band_power / motion_denom).clamp(0.0, 1.0); + let sp_norm = (feat.spectral_power / sp_denom).clamp(0.0, 1.0); var_norm * 0.40 + cp_norm * 0.20 + motion_norm * 0.25 + sp_norm * 0.15 } @@ -3711,8 +3829,9 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) { // Aggregate person count: gate on presence first (matching WiFi path). let now = std::time::Instant::now(); let total_persons = if vitals.presence { + let dedup = s.dedup_factor; let (fused, fallback_count) = multistatic_bridge::fuse_or_fallback( - &s.multistatic_fuser, &s.node_states, + &s.multistatic_fuser, &s.node_states, dedup, ); match fused { Some(ref f) => { @@ -3973,8 +4092,9 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) { // Aggregate person count: gate on presence first (matching WiFi path). let now = std::time::Instant::now(); let total_persons = if classification.presence { + let dedup = s.dedup_factor; let (fused, fallback_count) = multistatic_bridge::fuse_or_fallback( - &s.multistatic_fuser, &s.node_states, + &s.multistatic_fuser, &s.node_states, dedup, ); match fused { Some(ref f) => { @@ -4126,8 +4246,13 @@ async fn simulated_data_task(state: SharedState, tick_ms: u64) { let frame_amplitudes = frame.amplitudes.clone(); let frame_n_sub = frame.n_subcarriers; + // ADR-044 §5.2: feed raw features into rolling-P95 estimators before scoring. + s.p95_variance.push(features.variance); + s.p95_motion_band_power.push(features.motion_band_power); + s.p95_spectral_power.push(features.spectral_power); + // Multi-person estimation with temporal smoothing (EMA α=0.10). - let raw_score = compute_person_score(&features); + let raw_score = compute_person_score(&*s, &features); s.smoothed_person_score = s.smoothed_person_score * 0.90 + raw_score * 0.10; let est_persons = if classification.presence { let count = s.person_count(); @@ -4766,6 +4891,11 @@ async fn main() { let initial_recordings = scan_recording_files(); info!("Discovered {} model files, {} recording files", initial_models.len(), initial_recordings.len()); + // ADR-044 §5.3: load persisted runtime config from the data directory. + let data_dir = std::path::PathBuf::from("data"); + let runtime_config = load_runtime_config(&data_dir); + info!("Loaded runtime config: dedup_factor={:.2}", runtime_config.dedup_factor); + let (tx, _) = broadcast::channel::(256); let state: SharedState = Arc::new(RwLock::new(AppStateInner { latest_update: None, @@ -4841,6 +4971,13 @@ async fn main() { } else { None }, + // ADR-044 §5.2: rolling-P95 over ~30 s at 20 Hz; warm-up after 60 samples. + p95_variance: RollingP95::new(600, 60), + p95_motion_band_power: RollingP95::new(600, 60), + p95_spectral_power: RollingP95::new(600, 60), + // ADR-044 §5.3: runtime-configurable dedup factor (persisted). + dedup_factor: runtime_config.dedup_factor, + data_dir: data_dir.clone(), })); // Start background tasks based on source @@ -4941,6 +5078,9 @@ async fn main() { .route("/api/v1/calibration/start", post(calibration_start)) .route("/api/v1/calibration/stop", post(calibration_stop)) .route("/api/v1/calibration/status", get(calibration_status)) + // ADR-044 §5.3: runtime-configurable dedup factor + .route("/api/v1/config/dedup-factor", get(config_get_dedup_factor).post(config_set_dedup_factor)) + .route("/api/v1/config/ground-truth", post(config_set_ground_truth)) // Static UI files .nest_service("/ui", ServeDir::new(&ui_path)) .layer(SetResponseHeaderLayer::overriding( @@ -5050,3 +5190,131 @@ mod novelty_tests { assert!(ns.last_novelty_score.is_some()); } } + +// ── ADR-044 §5.3: dedup_factor runtime configuration endpoints ──────────────── + +/// `GET /api/v1/config/dedup-factor` — read the current dedup factor. +async fn config_get_dedup_factor( + State(state): State, +) -> Json { + let s = state.read().await; + Json(serde_json::json!({ + "dedup_factor": s.dedup_factor, + "description": "Divisor for multi-node person count deduplication (sum / factor). Range: 1.0–10.0." + })) +} + +/// `POST /api/v1/config/dedup-factor` — set the dedup factor (clamped 1.0–10.0). +/// +/// Body: `{ "value": }` +async fn config_set_dedup_factor( + State(state): State, + Json(body): Json, +) -> Json { + let value = body.get("value").and_then(|v| v.as_f64()).unwrap_or(3.0); + let clamped = value.clamp(1.0, 10.0); + let mut s = state.write().await; + s.dedup_factor = clamped; + let data_dir = s.data_dir.clone(); + drop(s); + save_runtime_config(&data_dir, &RuntimeConfig { dedup_factor: clamped }); + Json(serde_json::json!({ + "status": "ok", + "dedup_factor": clamped, + })) +} + +/// `POST /api/v1/config/ground-truth` — auto-tune dedup factor from a known person count. +/// +/// Derives `dedup_factor = raw_node_sum / ground_truth_count` from the current +/// per-node person counts, clamped to [1.0, 10.0]. Persisted immediately. +/// +/// Body: `{ "count": }` +async fn config_set_ground_truth( + State(state): State, + Json(body): Json, +) -> Json { + let ground_truth = match body.get("count").and_then(|v| v.as_u64()) { + Some(n) if n > 0 => n as usize, + _ => return Json(serde_json::json!({"error": "count must be a positive integer"})), + }; + let mut s = state.write().await; + let raw_sum: usize = s.node_states.values() + .filter(|ns| ns.last_frame_time + .map(|t| t.elapsed() < std::time::Duration::from_secs(10)) + .unwrap_or(false)) + .map(|ns| ns.prev_person_count) + .sum(); + let optimal = if raw_sum > 0 { + (raw_sum as f64) / (ground_truth as f64) + } else { + 3.0 + }; + let clamped = optimal.clamp(1.0, 10.0); + s.dedup_factor = clamped; + let data_dir = s.data_dir.clone(); + drop(s); + save_runtime_config(&data_dir, &RuntimeConfig { dedup_factor: clamped }); + Json(serde_json::json!({ + "status": "ok", + "ground_truth": ground_truth, + "raw_sum": raw_sum, + "computed_dedup_factor": clamped, + })) +} + +// ── Unit tests: RollingP95 ───────────────────────────────────────────────────── + +#[cfg(test)] +mod rolling_p95_tests { + use super::RollingP95; + + #[test] + fn cold_start_returns_none() { + let p = RollingP95::new(100, 10); + assert!(p.current().is_none(), "empty buffer must return None"); + } + + #[test] + fn below_min_samples_returns_none() { + let mut p = RollingP95::new(100, 10); + for i in 1..=9 { + p.push(i as f64); + } + assert!(p.current().is_none(), "fewer than min_samples must return None"); + } + + #[test] + fn p95_of_ramp_is_near_95() { + let mut p = RollingP95::new(100, 10); + for i in 1..=100 { + p.push(i as f64); + } + let p95 = p.current().expect("should have value after 100 samples"); + assert!( + p95 >= 94.0 && p95 <= 96.0, + "P95 of 1..=100 should be ~95, got {p95}" + ); + } + + #[test] + fn window_slides_evicts_oldest() { + let mut p = RollingP95::new(5, 3); + // Push 1..=5, then 100 — oldest (1) is evicted. + for i in 1..=5 { + p.push(i as f64); + } + p.push(100.0); // evicts 1; buf = [2, 3, 4, 5, 100] + let p95 = p.current().expect("6 pushes, window=5 → 5 samples"); + // P95 of [2,3,4,5,100]: idx = ceil(5*0.95)=5 → sorted[4]=100 + assert_eq!(p95, 100.0, "largest value should dominate p95 after eviction"); + } + + #[test] + fn len_reports_buffer_size() { + let mut p = RollingP95::new(10, 5); + assert_eq!(p.len(), 0); + p.push(1.0); + assert_eq!(p.len(), 1); + } +} diff --git a/v2/crates/wifi-densepose-sensing-server/src/multistatic_bridge.rs b/v2/crates/wifi-densepose-sensing-server/src/multistatic_bridge.rs index 794b15bc7..0a841ef9e 100644 --- a/v2/crates/wifi-densepose-sensing-server/src/multistatic_bridge.rs +++ b/v2/crates/wifi-densepose-sensing-server/src/multistatic_bridge.rs @@ -97,6 +97,7 @@ pub fn node_frames_from_states(node_states: &HashMap) -> Vec, + dedup_factor: f64, ) -> (Option, Option) { let frames = node_frames_from_states(node_states); if frames.is_empty() { @@ -109,9 +110,11 @@ pub fn fuse_or_fallback( (Some(fused), None) } Err(e) => { - tracing::debug!("Multistatic fusion failed ({e}), using per-node max fallback"); - // Use max (not sum) to avoid double-counting when nodes have overlapping coverage. - let max_count: usize = node_states + tracing::debug!("Multistatic fusion failed ({e}), using per-node sum/dedup fallback"); + // Sum per-node counts then divide by dedup_factor (assumed average + // visibility per body across nodes). ADR-044 §5.1. + // dedup_factor is runtime-configurable; default 3.0. + let total: usize = node_states .values() .filter(|ns| { ns.last_frame_time @@ -119,9 +122,9 @@ pub fn fuse_or_fallback( .unwrap_or(false) }) .map(|ns| ns.prev_person_count) - .max() - .unwrap_or(0); - (None, Some(max_count)) + .sum(); + let estimated = ((total as f64) / dedup_factor).ceil() as usize; + (None, Some(estimated)) } } } @@ -257,7 +260,7 @@ mod tests { fn test_fuse_or_fallback_empty() { let fuser = MultistaticFuser::new(); let states: HashMap = HashMap::new(); - let (fused, count) = fuse_or_fallback(&fuser, &states); + let (fused, count) = fuse_or_fallback(&fuser, &states, 3.0); assert!(fused.is_none()); assert_eq!(count, Some(0)); }