Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,22 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]

### Added
- **`RollingP95` adaptive feature normalizer** (`v2/crates/wifi-densepose-sensing-server`) —
Streaming P95 estimator (600-sample / ~30 s sliding window) that self-calibrates
feature normalization to whatever distribution the deployment produces. Replaces
fixed-scale denominators (`variance/300`, `motion/250`, `spectral/500`) which saturated
when live ESP32 values exceeded those limits, collapsing dynamic range to zero.
Cold-start (<60 samples) falls back to the legacy denominators so day-0 behaviour
is preserved. Deployment-neutral: no hardcoded values. (ADR-044 §5.2)

- **`dedup_factor` runtime configuration API** (`v2/crates/wifi-densepose-sensing-server`) —
Exposes the multi-node person-count deduplication divisor at runtime via REST:
- `GET /api/v1/config/dedup-factor` — read current value.
- `POST /api/v1/config/dedup-factor` — set value (clamped 1.0–10.0, persisted).
- `POST /api/v1/config/ground-truth` — auto-tunes `dedup_factor` from a known
person count (`{"count": N}`); derives optimal divisor from current node-sum.
Config is persisted to `data/config.json` and reloaded on restart. (ADR-044 §5.3)

- **`nvsim` crate — deterministic NV-diamond magnetometer pipeline simulator** (ADR-089) —
New standalone leaf crate at `v2/crates/nvsim` modeling a forward-only
magnetic sensing path: scene → source synthesis (Biot–Savart, dipole,
Expand Down
290 changes: 279 additions & 11 deletions v2/crates/wifi-densepose-sensing-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,93 @@ fn build_node_features(
Some(entries)
}

// ── ADR-044 §5.2: Rolling P95 adaptive feature normalizer ────────────────────

/// Streaming P95 estimator over a fixed-size sliding window.
///
/// Self-calibrates feature normalization to whatever distribution the deployment
/// produces — no hardcoded scale values that can saturate in large rooms or
/// degrade in high-interference environments.
///
/// O(n log n) per query via sorted copy — acceptable at 20 Hz with window=600.
/// Cold-start (len < min_samples) returns `None` so the caller uses the legacy
/// fixed denominator, preserving day-0 behaviour.
pub struct RollingP95 {
buf: std::collections::VecDeque<f64>,
window: usize,
min_samples: usize,
}

impl RollingP95 {
pub fn new(window: usize, min_samples: usize) -> Self {
Self {
buf: std::collections::VecDeque::with_capacity(window),
window,
min_samples,
}
}

pub fn push(&mut self, v: f64) {
if self.buf.len() == self.window {
self.buf.pop_front();
}
self.buf.push_back(v);
}

/// Returns `Some(p95)` once enough samples have accumulated, else `None`.
pub fn current(&self) -> Option<f64> {
if self.buf.len() < self.min_samples {
return None;
}
let mut sorted: Vec<f64> = self.buf.iter().copied().collect();
sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
let idx = ((sorted.len() as f64) * 0.95).ceil() as usize;
Some(sorted[idx.saturating_sub(1).min(sorted.len() - 1)])
}

#[allow(dead_code)]
pub fn len(&self) -> usize {
self.buf.len()
}
}

// ── ADR-044 §5.3: Runtime config persistence ─────────────────────────────────

/// Runtime configuration that persists across server restarts via `data/config.json`.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct RuntimeConfig {
/// Divisor for multi-node person-count deduplication (sum / factor).
pub dedup_factor: f64,
}

impl Default for RuntimeConfig {
fn default() -> Self {
Self { dedup_factor: 3.0 }
}
}

/// Load persisted runtime config from `<data_dir>/config.json`.
/// Falls back to [`RuntimeConfig::default`] if the file is absent or malformed.
pub(crate) fn load_runtime_config(data_dir: &std::path::Path) -> RuntimeConfig {
let path = data_dir.join("config.json");
match std::fs::read_to_string(&path) {
Ok(json) => serde_json::from_str(&json).unwrap_or_default(),
Err(_) => RuntimeConfig::default(),
}
}

/// Persist runtime config to `<data_dir>/config.json`.
pub(crate) fn save_runtime_config(data_dir: &std::path::Path, config: &RuntimeConfig) {
let path = data_dir.join("config.json");
if let Ok(json) = serde_json::to_string_pretty(config) {
if let Err(e) = std::fs::write(&path, json) {
warn!("Failed to save runtime config to {}: {e}", path.display());
} else {
info!("Runtime config saved to {}", path.display());
}
}
}

/// Shared application state
struct AppStateInner {
latest_update: Option<SensingUpdate>,
Expand Down Expand Up @@ -642,6 +729,21 @@ struct AppStateInner {
multistatic_fuser: MultistaticFuser,
/// SVD-based room field model for eigenvalue person counting (None until calibration).
field_model: Option<FieldModel>,
// ── ADR-044 §5.2: adaptive rolling-p95 normalization ─────────────────────
/// Rolling P95 of `FeatureInfo.variance` over the last ~30 s (600 frames @ 20 Hz).
pub(crate) p95_variance: RollingP95,
/// Rolling P95 of `FeatureInfo.motion_band_power` over the last ~30 s.
pub(crate) p95_motion_band_power: RollingP95,
/// Rolling P95 of `FeatureInfo.spectral_power` over the last ~30 s.
pub(crate) p95_spectral_power: RollingP95,
// ── ADR-044 §5.3: runtime-configurable dedup factor ───────────────────────
/// Divisor for multi-node person-count deduplication (sum / factor).
/// Default 3.0 (one body visible to ~3 nodes on average).
/// Configurable at runtime via `POST /api/v1/config/dedup-factor` and
/// `POST /api/v1/config/ground-truth`. Persisted across restarts.
pub(crate) dedup_factor: f64,
/// Data directory for persisting runtime config (parent of `firmware_dir`).
pub(crate) data_dir: std::path::PathBuf,
}

/// If no ESP32 frame arrives within this duration, source reverts to offline.
Expand Down Expand Up @@ -1728,8 +1830,13 @@ async fn windows_wifi_task(state: SharedState, tick_ms: u64) {

let feat_variance = features.variance;

// ADR-044 §5.2: feed raw features into rolling-P95 estimators before scoring.
s.p95_variance.push(features.variance);
s.p95_motion_band_power.push(features.motion_band_power);
s.p95_spectral_power.push(features.spectral_power);

// Multi-person estimation with temporal smoothing (EMA α=0.10).
let raw_score = compute_person_score(&features);
let raw_score = compute_person_score(&*s, &features);
s.smoothed_person_score = s.smoothed_person_score * 0.90 + raw_score * 0.10;
let est_persons = if classification.presence {
let count = s.person_count();
Expand Down Expand Up @@ -1867,8 +1974,13 @@ async fn windows_wifi_fallback_tick(state: &SharedState, seq: u32) {

let feat_variance = features.variance;

// ADR-044 §5.2: feed raw features into rolling-P95 estimators before scoring.
s.p95_variance.push(features.variance);
s.p95_motion_band_power.push(features.motion_band_power);
s.p95_spectral_power.push(features.spectral_power);

// Multi-person estimation with temporal smoothing (EMA α=0.10).
let raw_score = compute_person_score(&features);
let raw_score = compute_person_score(&*s, &features);
s.smoothed_person_score = s.smoothed_person_score * 0.90 + raw_score * 0.10;
let est_persons = if classification.presence {
let count = s.person_count();
Expand Down Expand Up @@ -2256,13 +2368,19 @@ fn fuse_multi_node_features(
///
/// Returns a raw score (0.0..1.0) that the caller converts to person count
/// after temporal smoothing.
fn compute_person_score(feat: &FeatureInfo) -> f64 {
// Normalize each feature to [0, 1] using ranges calibrated from real
// ESP32 hardware (COM6/COM9 on ruv.net, March 2026).
let var_norm = (feat.variance / 300.0).clamp(0.0, 1.0);
fn compute_person_score(state: &AppStateInner, feat: &FeatureInfo) -> f64 {
// ADR-044 §5.2: adaptive rolling-P95 normalization.
// Legacy fixed denominators (variance/300, motion/250, spectral/500) saturate
// when live ESP32 values exceed those limits — zero dynamic range results.
// Use the P95 of the last ~30 s of history instead, falling back to the legacy
// denominators during cold-start (<60 samples) to preserve day-0 behaviour.
let var_denom = state.p95_variance.current().map(|p| p.max(50.0)).unwrap_or(300.0);
let motion_denom = state.p95_motion_band_power.current().map(|p| p.max(50.0)).unwrap_or(250.0);
let sp_denom = state.p95_spectral_power.current().map(|p| p.max(100.0)).unwrap_or(500.0);
let var_norm = (feat.variance / var_denom).clamp(0.0, 1.0);
let cp_norm = (feat.change_points as f64 / 30.0).clamp(0.0, 1.0);
let motion_norm = (feat.motion_band_power / 250.0).clamp(0.0, 1.0);
let sp_norm = (feat.spectral_power / 500.0).clamp(0.0, 1.0);
let motion_norm = (feat.motion_band_power / motion_denom).clamp(0.0, 1.0);
let sp_norm = (feat.spectral_power / sp_denom).clamp(0.0, 1.0);
var_norm * 0.40 + cp_norm * 0.20 + motion_norm * 0.25 + sp_norm * 0.15
}

Expand Down Expand Up @@ -3711,8 +3829,9 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) {
// Aggregate person count: gate on presence first (matching WiFi path).
let now = std::time::Instant::now();
let total_persons = if vitals.presence {
let dedup = s.dedup_factor;
let (fused, fallback_count) = multistatic_bridge::fuse_or_fallback(
&s.multistatic_fuser, &s.node_states,
&s.multistatic_fuser, &s.node_states, dedup,
);
match fused {
Some(ref f) => {
Expand Down Expand Up @@ -3973,8 +4092,9 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) {
// Aggregate person count: gate on presence first (matching WiFi path).
let now = std::time::Instant::now();
let total_persons = if classification.presence {
let dedup = s.dedup_factor;
let (fused, fallback_count) = multistatic_bridge::fuse_or_fallback(
&s.multistatic_fuser, &s.node_states,
&s.multistatic_fuser, &s.node_states, dedup,
);
match fused {
Some(ref f) => {
Expand Down Expand Up @@ -4126,8 +4246,13 @@ async fn simulated_data_task(state: SharedState, tick_ms: u64) {
let frame_amplitudes = frame.amplitudes.clone();
let frame_n_sub = frame.n_subcarriers;

// ADR-044 §5.2: feed raw features into rolling-P95 estimators before scoring.
s.p95_variance.push(features.variance);
s.p95_motion_band_power.push(features.motion_band_power);
s.p95_spectral_power.push(features.spectral_power);

// Multi-person estimation with temporal smoothing (EMA α=0.10).
let raw_score = compute_person_score(&features);
let raw_score = compute_person_score(&*s, &features);
s.smoothed_person_score = s.smoothed_person_score * 0.90 + raw_score * 0.10;
let est_persons = if classification.presence {
let count = s.person_count();
Expand Down Expand Up @@ -4766,6 +4891,11 @@ async fn main() {
let initial_recordings = scan_recording_files();
info!("Discovered {} model files, {} recording files", initial_models.len(), initial_recordings.len());

// ADR-044 §5.3: load persisted runtime config from the data directory.
let data_dir = std::path::PathBuf::from("data");
let runtime_config = load_runtime_config(&data_dir);
info!("Loaded runtime config: dedup_factor={:.2}", runtime_config.dedup_factor);

let (tx, _) = broadcast::channel::<String>(256);
let state: SharedState = Arc::new(RwLock::new(AppStateInner {
latest_update: None,
Expand Down Expand Up @@ -4841,6 +4971,13 @@ async fn main() {
} else {
None
},
// ADR-044 §5.2: rolling-P95 over ~30 s at 20 Hz; warm-up after 60 samples.
p95_variance: RollingP95::new(600, 60),
p95_motion_band_power: RollingP95::new(600, 60),
p95_spectral_power: RollingP95::new(600, 60),
// ADR-044 §5.3: runtime-configurable dedup factor (persisted).
dedup_factor: runtime_config.dedup_factor,
data_dir: data_dir.clone(),
}));

// Start background tasks based on source
Expand Down Expand Up @@ -4941,6 +5078,9 @@ async fn main() {
.route("/api/v1/calibration/start", post(calibration_start))
.route("/api/v1/calibration/stop", post(calibration_stop))
.route("/api/v1/calibration/status", get(calibration_status))
// ADR-044 §5.3: runtime-configurable dedup factor
.route("/api/v1/config/dedup-factor", get(config_get_dedup_factor).post(config_set_dedup_factor))
.route("/api/v1/config/ground-truth", post(config_set_ground_truth))
// Static UI files
.nest_service("/ui", ServeDir::new(&ui_path))
.layer(SetResponseHeaderLayer::overriding(
Expand Down Expand Up @@ -5050,3 +5190,131 @@ mod novelty_tests {
assert!(ns.last_novelty_score.is_some());
}
}

// ── ADR-044 §5.3: dedup_factor runtime configuration endpoints ────────────────

/// `GET /api/v1/config/dedup-factor` — read the current dedup factor.
async fn config_get_dedup_factor(
State(state): State<SharedState>,
) -> Json<serde_json::Value> {
let s = state.read().await;
Json(serde_json::json!({
"dedup_factor": s.dedup_factor,
"description": "Divisor for multi-node person count deduplication (sum / factor). Range: 1.0–10.0."
}))
}

/// `POST /api/v1/config/dedup-factor` — set the dedup factor (clamped 1.0–10.0).
///
/// Body: `{ "value": <f64> }`
async fn config_set_dedup_factor(
State(state): State<SharedState>,
Json(body): Json<serde_json::Value>,
) -> Json<serde_json::Value> {
let value = body.get("value").and_then(|v| v.as_f64()).unwrap_or(3.0);
let clamped = value.clamp(1.0, 10.0);
let mut s = state.write().await;
s.dedup_factor = clamped;
let data_dir = s.data_dir.clone();
drop(s);
save_runtime_config(&data_dir, &RuntimeConfig { dedup_factor: clamped });
Json(serde_json::json!({
"status": "ok",
"dedup_factor": clamped,
}))
}

/// `POST /api/v1/config/ground-truth` — auto-tune dedup factor from a known person count.
///
/// Derives `dedup_factor = raw_node_sum / ground_truth_count` from the current
/// per-node person counts, clamped to [1.0, 10.0]. Persisted immediately.
///
/// Body: `{ "count": <u64> }`
async fn config_set_ground_truth(
State(state): State<SharedState>,
Json(body): Json<serde_json::Value>,
) -> Json<serde_json::Value> {
let ground_truth = match body.get("count").and_then(|v| v.as_u64()) {
Some(n) if n > 0 => n as usize,
_ => return Json(serde_json::json!({"error": "count must be a positive integer"})),
};
let mut s = state.write().await;
let raw_sum: usize = s.node_states.values()
.filter(|ns| ns.last_frame_time
.map(|t| t.elapsed() < std::time::Duration::from_secs(10))
.unwrap_or(false))
.map(|ns| ns.prev_person_count)
.sum();
let optimal = if raw_sum > 0 {
(raw_sum as f64) / (ground_truth as f64)
} else {
3.0
};
let clamped = optimal.clamp(1.0, 10.0);
s.dedup_factor = clamped;
let data_dir = s.data_dir.clone();
drop(s);
save_runtime_config(&data_dir, &RuntimeConfig { dedup_factor: clamped });
Json(serde_json::json!({
"status": "ok",
"ground_truth": ground_truth,
"raw_sum": raw_sum,
"computed_dedup_factor": clamped,
}))
}

// ── Unit tests: RollingP95 ─────────────────────────────────────────────────────

#[cfg(test)]
mod rolling_p95_tests {
use super::RollingP95;

#[test]
fn cold_start_returns_none() {
let p = RollingP95::new(100, 10);
assert!(p.current().is_none(), "empty buffer must return None");
}

#[test]
fn below_min_samples_returns_none() {
let mut p = RollingP95::new(100, 10);
for i in 1..=9 {
p.push(i as f64);
}
assert!(p.current().is_none(), "fewer than min_samples must return None");
}

#[test]
fn p95_of_ramp_is_near_95() {
let mut p = RollingP95::new(100, 10);
for i in 1..=100 {
p.push(i as f64);
}
let p95 = p.current().expect("should have value after 100 samples");
assert!(
p95 >= 94.0 && p95 <= 96.0,
"P95 of 1..=100 should be ~95, got {p95}"
);
}

#[test]
fn window_slides_evicts_oldest() {
let mut p = RollingP95::new(5, 3);
// Push 1..=5, then 100 — oldest (1) is evicted.
for i in 1..=5 {
p.push(i as f64);
}
p.push(100.0); // evicts 1; buf = [2, 3, 4, 5, 100]
let p95 = p.current().expect("6 pushes, window=5 → 5 samples");
// P95 of [2,3,4,5,100]: idx = ceil(5*0.95)=5 → sorted[4]=100
assert_eq!(p95, 100.0, "largest value should dominate p95 after eviction");
}

#[test]
fn len_reports_buffer_size() {
let mut p = RollingP95::new(10, 5);
assert_eq!(p.len(), 0);
p.push(1.0);
assert_eq!(p.len(), 1);
}
}
Loading
Loading