From d3d943160bec7cd294c5ade0c81797646dc3137e Mon Sep 17 00:00:00 2001 From: Lance Tuller Date: Wed, 29 Apr 2026 14:12:08 -0400 Subject: [PATCH 1/6] feat(correlation): playbook-aware corroborator finalization MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR B item 1. Lets a late corroborator finalize a signal group on its own path by re-resolving the playbook-specific override at recompute time. Eliminates the v0.16.0 dependency on 'another primary event must fire within the window' for late corroborators to take effect. Schema: - migrations/012_signal_groups_playbook.sql adds nullable signal_groups.playbook_name. No SQL backfill (match_json doesn't carry a playbook reference); the daemon backfills on the next primary event for each group via a COALESCE update. Code: - SignalGroup gains pub playbook_name: Option; serde-default None for forward/backward compatibility. - handle_ban writes playbook_name = matching_playbook.map(|p| p.name) on group create AND backfills it on existing groups when the column is NULL. Update path uses COALESCE to never wipe a previously-set value. - recompute_group_aggregates re-resolves the playbook by name from the live state.playbooks snapshot, applies the override via check_corroboration_with_primary, and is now allowed to flip corroboration_met=true. Logs a structured 'awaiting next primary' info event when the flip happens. - Conservative fallback preserved: NULL playbook_name OR resolved playbook missing from the live config => keep previous corroboration_met (no flip from corroborator path). Matches v0.16.0 behavior on stale state. - Mitigation actuation deliberately stays single-sourced through handle_ban; corroborator-side recompute updates state but does not fire the mitigation. The next primary event picks up the flag and triggers normally. Repository changes: - All 7 SignalGroup-returning queries (insert, get, find, list, find_open, find_expired, find_open_by_dimensions) updated to project playbook_name. - update_signal_group uses COALESCE($7, playbook_name). - SignalGroupRow + From impl carry the column. Tests: - test_late_corroborator_finalizes_with_playbook_override: primary event below override threshold + corroborator → corroboration_met flips true. - test_late_corroborator_skips_when_playbook_name_is_stale: stored playbook removed from config → conservative no-flip behavior. cargo fmt + cargo clippy --all-targets -- -D warnings clean. 222 unit + 123 integration + 16 postgres tests green. --- migrations/012_signal_groups_playbook.sql | 46 +++++ src/api/handlers.rs | 87 ++++++++- src/correlation/engine.rs | 13 ++ src/db/mod.rs | 5 + src/db/repository.rs | 29 +-- tests/integration.rs | 204 ++++++++++++++++++++++ 6 files changed, 364 insertions(+), 20 deletions(-) create mode 100644 migrations/012_signal_groups_playbook.sql diff --git a/migrations/012_signal_groups_playbook.sql b/migrations/012_signal_groups_playbook.sql new file mode 100644 index 0000000..3149b18 --- /dev/null +++ b/migrations/012_signal_groups_playbook.sql @@ -0,0 +1,46 @@ +-- Migration 012: Remember the resolved playbook on signal_groups so the +-- corroborator path can re-resolve playbook-specific correlation +-- overrides (min_sources / confidence_threshold) without needing the +-- full primary-event context. +-- +-- Background: PR A (ADR 021) deliberately left `recompute_group_aggregates` +-- conservative: a corroborator-only recompute would never flip +-- `corroboration_met` from false -> true, because it didn't know which +-- playbook governed the group and thus couldn't resolve the override. +-- This migration adds a nullable `playbook_name`. Primary-event ingest +-- writes it; the corroborator path looks it up and resolves the override +-- against the live playbook config. +-- +-- Backfill: best-effort, copy the playbook name from any mitigation that +-- was triggered by this group. If no mitigation exists yet (e.g. the +-- group is below threshold and only has a single primary event), the +-- column stays NULL and the corroborator recompute falls back to the +-- v0.16.0 conservative behavior. The next primary event for the same +-- group will fill it in. + +ALTER TABLE signal_groups + ADD COLUMN IF NOT EXISTS playbook_name TEXT; + +WITH agg AS ( + SELECT m.signal_group_id AS group_id, + -- All mitigations from one signal group should share a playbook + -- (groups are keyed by vector and playbooks fan out by vector), + -- so MIN() is safe and deterministic for the rare race where a + -- vector matched two playbooks. + MIN(m.match_json) AS sample_match_json + FROM mitigations m + WHERE m.signal_group_id IS NOT NULL + GROUP BY m.signal_group_id +) +UPDATE signal_groups sg +SET playbook_name = agg.group_id::text -- placeholder, replaced below +FROM agg +WHERE 1 = 0; +-- The actual backfill happens at runtime: the daemon will resolve and +-- populate `playbook_name` on the next primary event for any group that +-- still has it NULL. Doing this in SQL is brittle because match_json +-- doesn't carry a playbook reference; we'd be re-running the matcher. + +CREATE INDEX IF NOT EXISTS idx_signal_groups_playbook + ON signal_groups (playbook_name) + WHERE playbook_name IS NOT NULL; diff --git a/src/api/handlers.rs b/src/api/handlers.rs index 447c01a..2a021d5 100644 --- a/src/api/handlers.rs +++ b/src/api/handlers.rs @@ -715,6 +715,9 @@ async fn handle_ban( .add_interface(interface.clone()); } } + // Remember the resolved playbook on the group so the corroborator + // path (PR B) can re-resolve the override at recompute time. + new_group.playbook_name = matching_playbook.map(|p| p.name.clone()); let group = state .repo .insert_signal_group(&new_group) @@ -740,7 +743,14 @@ async fn handle_ban( group.primary_dimensions.add_pop(state.settings.pop.clone()); dims_changed = before != group.primary_dimensions; } - if dims_changed { + // Backfill playbook_name on existing groups created before PR B (or + // before any primary event resolved a playbook). COALESCE in + // update_signal_group keeps an existing non-NULL value stable. + let playbook_changed = group.playbook_name.is_none() && matching_playbook.is_some(); + if playbook_changed { + group.playbook_name = matching_playbook.map(|p| p.name.clone()); + } + if dims_changed || playbook_changed { state .repo .update_signal_group(&group) @@ -5424,6 +5434,14 @@ pub async fn get_corroborator_activity( /// Recompute a signal group's derived_confidence, source_count and /// corroboration_met flag from its events (including corroborators). +/// +/// PR B: this path now re-resolves the playbook-specific correlation +/// override using the stored `playbook_name` on the group, and is +/// allowed to flip `corroboration_met` from false→true even when the +/// triggering ingest was a corroborator. We deliberately do NOT +/// actuate a mitigation from here — the next primary-path event will +/// pick up the flipped flag and trigger normally. This keeps mitigation +/// actuation single-sourced through `handle_ban`. async fn recompute_group_aggregates(state: &Arc, group_id: Uuid) -> Result<(), AppError> { use crate::correlation::CorrelationEngine; @@ -5453,17 +5471,68 @@ async fn recompute_group_aggregates(state: &Arc, group_id: Uuid) -> Re let has_primary = events.iter().any(|e| !e.is_corroborating); - let mut updated = group; - updated.derived_confidence = derived; - updated.source_count = count; - // Only primary ingest has enough context to evaluate playbook-specific - // overrides safely. Corroborator-only recomputes update aggregates but do - // not promote a group from false->true on their own. - updated.corroboration_met = if has_primary { - updated.corroboration_met + // Resolve the playbook override using the group's stored playbook_name. + // If the group was created before PR B (playbook_name is NULL) or the + // playbook has since been removed, fall back to the conservative + // pre-PR-B behavior: aggregates update, but we don't flip + // corroboration_met → true on the corroborator path. + let was_met = group.corroboration_met; + let mut newly_met = false; + let new_met = if has_primary { + let correlation_config = state.correlation_config.read().await.clone(); + let playbooks = state.playbooks.read().await.clone(); + let resolved_playbook = group + .playbook_name + .as_deref() + .and_then(|name| playbooks.playbooks.iter().find(|p| p.name == name)); + let override_ = resolved_playbook.and_then(|p| p.correlation.as_ref()); + match (resolved_playbook, group.playbook_name.as_deref()) { + (Some(_), _) => { + let met = CorrelationEngine::check_corroboration_with_primary( + count, + derived, + has_primary, + &correlation_config, + override_, + ); + if met && !was_met { + newly_met = true; + } + met + } + (None, Some(missing)) => { + tracing::debug!( + group_id = %group_id, + playbook = %missing, + "stored playbook_name no longer resolves; keeping previous corroboration_met" + ); + was_met + } + (None, None) => { + // Pre-PR-B group with no resolved playbook yet — preserve + // previous behavior. + was_met + } + } } else { + // No primary event yet → invariant: corroboration cannot be met. false }; + + if newly_met { + tracing::info!( + group_id = %group_id, + derived_confidence = derived, + source_count = count, + playbook = ?group.playbook_name, + "signal group reached corroboration threshold via corroborator path; awaiting next primary event to actuate mitigation" + ); + } + + let mut updated = group; + updated.derived_confidence = derived; + updated.source_count = count; + updated.corroboration_met = new_met; state .repo .update_signal_group(&updated) diff --git a/src/correlation/engine.rs b/src/correlation/engine.rs index 93e0f4b..a893b5f 100644 --- a/src/correlation/engine.rs +++ b/src/correlation/engine.rs @@ -21,6 +21,15 @@ pub struct SignalGroup { /// Used by the corroborator matching flow (ADR 021). #[serde(default)] pub primary_dimensions: PrimaryDimensions, + /// Name of the playbook that matched the primary event(s) for this + /// group. Used by the corroborator path (PR B) to re-resolve the + /// playbook-specific correlation override (`min_sources`, + /// `confidence_threshold`) without needing the full primary-event + /// context. NULL means: no primary event resolved a playbook yet + /// (or we're upgrading from a pre-PR-B build); corroborator-only + /// recompute falls back to the conservative no-flip behavior. + #[serde(default)] + pub playbook_name: Option, } /// Serializable form of aggregated primary-event dimensions. Stored in the @@ -271,6 +280,7 @@ impl CorrelationEngine { status: SignalGroupStatus::Open, corroboration_met: false, primary_dimensions: PrimaryDimensions::default(), + playbook_name: None, } } @@ -700,6 +710,7 @@ mod tests { status: SignalGroupStatus::Open, corroboration_met: true, primary_dimensions: PrimaryDimensions::default(), + playbook_name: None, }; let contributions = vec![ @@ -741,6 +752,7 @@ mod tests { status: SignalGroupStatus::Open, corroboration_met: false, primary_dimensions: PrimaryDimensions::default(), + playbook_name: None, }; let contributions = vec![SourceContribution { @@ -803,6 +815,7 @@ mod tests { status: SignalGroupStatus::Open, corroboration_met: false, primary_dimensions: PrimaryDimensions::default(), + playbook_name: None, } } diff --git a/src/db/mod.rs b/src/db/mod.rs index 67bc489..ee23994 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -81,6 +81,11 @@ async fn run_migrations(pool: &PgPool) -> Result<()> { "backfill_primary_dimensions", include_str!("../../migrations/011_backfill_primary_dimensions.sql"), ), + ( + 12, + "signal_groups_playbook", + include_str!("../../migrations/012_signal_groups_playbook.sql"), + ), ]; // Bootstrap: run all migrations first (they use IF NOT EXISTS) diff --git a/src/db/repository.rs b/src/db/repository.rs index b4439f5..ae13403 100644 --- a/src/db/repository.rs +++ b/src/db/repository.rs @@ -911,18 +911,20 @@ impl RepositoryTrait for Repository { WITH existing AS ( SELECT group_id, victim_ip, vector, created_at, window_expires_at, derived_confidence, source_count, status, corroboration_met, - primary_dimensions + primary_dimensions, playbook_name FROM signal_groups WHERE victim_ip = $2 AND vector = $3 AND status = 'open' AND window_expires_at > NOW() LIMIT 1 ), inserted AS ( INSERT INTO signal_groups (group_id, victim_ip, vector, created_at, window_expires_at, - derived_confidence, source_count, status, corroboration_met, primary_dimensions) - SELECT $1, $2, $3, $4, $5, $6, $7, $8, $9, $10 + derived_confidence, source_count, status, corroboration_met, primary_dimensions, + playbook_name) + SELECT $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11 WHERE NOT EXISTS (SELECT 1 FROM existing) RETURNING group_id, victim_ip, vector, created_at, window_expires_at, - derived_confidence, source_count, status, corroboration_met, primary_dimensions + derived_confidence, source_count, status, corroboration_met, primary_dimensions, + playbook_name ) SELECT * FROM existing UNION ALL @@ -940,6 +942,7 @@ impl RepositoryTrait for Repository { .bind(group.status.as_str()) .bind(group.corroboration_met) .bind(serde_json::to_value(&group.primary_dimensions).unwrap_or(serde_json::json!({}))) + .bind(group.playbook_name.as_deref()) .fetch_one(&self.pool) .await; @@ -957,7 +960,7 @@ impl RepositoryTrait for Repository { r#" SELECT group_id, victim_ip, vector, created_at, window_expires_at, derived_confidence, source_count, status, corroboration_met, - primary_dimensions + primary_dimensions, playbook_name FROM signal_groups WHERE victim_ip = $1 AND vector = $2 AND status = 'open' AND window_expires_at > NOW() @@ -982,7 +985,8 @@ impl RepositoryTrait for Repository { source_count = $3, status = $4, corroboration_met = $5, - primary_dimensions = $6 + primary_dimensions = $6, + playbook_name = COALESCE($7, playbook_name) WHERE group_id = $1 "#, ) @@ -992,6 +996,7 @@ impl RepositoryTrait for Repository { .bind(group.status.as_str()) .bind(group.corroboration_met) .bind(serde_json::to_value(&group.primary_dimensions).unwrap_or(serde_json::json!({}))) + .bind(group.playbook_name.as_deref()) .execute(&self.pool) .await?; Ok(()) @@ -1002,7 +1007,7 @@ impl RepositoryTrait for Repository { r#" SELECT group_id, victim_ip, vector, created_at, window_expires_at, derived_confidence, source_count, status, corroboration_met, - primary_dimensions + primary_dimensions, playbook_name FROM signal_groups WHERE group_id = $1 "#, ) @@ -1017,7 +1022,7 @@ impl RepositoryTrait for Repository { r#" SELECT group_id, victim_ip, vector, created_at, window_expires_at, derived_confidence, source_count, status, corroboration_met, - primary_dimensions + primary_dimensions, playbook_name FROM signal_groups WHERE victim_ip = $1 AND vector = $2 AND status = 'open' AND window_expires_at > NOW() @@ -1097,7 +1102,7 @@ impl RepositoryTrait for Repository { r#" SELECT group_id, victim_ip, vector, created_at, window_expires_at, derived_confidence, source_count, status, corroboration_met, - primary_dimensions + primary_dimensions, playbook_name FROM signal_groups WHERE ($1::text IS NULL OR status = $1) AND ($2::text IS NULL OR vector = $2) @@ -1132,7 +1137,7 @@ impl RepositoryTrait for Repository { r#" SELECT group_id, victim_ip, vector, created_at, window_expires_at, derived_confidence, source_count, status, corroboration_met, - primary_dimensions + primary_dimensions, playbook_name FROM signal_groups WHERE status = 'open' AND window_expires_at <= NOW() "#, @@ -1161,7 +1166,7 @@ impl RepositoryTrait for Repository { r#" SELECT group_id, victim_ip, vector, created_at, window_expires_at, derived_confidence, source_count, status, corroboration_met, - primary_dimensions + primary_dimensions, playbook_name FROM signal_groups WHERE status = 'open' AND window_expires_at > $1 @@ -1502,6 +1507,7 @@ struct SignalGroupRow { status: String, corroboration_met: bool, primary_dimensions: serde_json::Value, + playbook_name: Option, } impl From for SignalGroup { @@ -1517,6 +1523,7 @@ impl From for SignalGroup { status: row.status.parse().unwrap_or(SignalGroupStatus::Open), corroboration_met: row.corroboration_met, primary_dimensions: serde_json::from_value(row.primary_dimensions).unwrap_or_default(), + playbook_name: row.playbook_name, } } } diff --git a/tests/integration.rs b/tests/integration.rs index 7641a02..211f181 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -5567,3 +5567,207 @@ async fn test_corroborator_source_activity_merges_cache_rows() { assert!(cpu.last_seen.is_some()); assert_eq!(by_source.remove("pop-utilization").unwrap().count, 1); } + +#[tokio::test] +async fn test_late_corroborator_finalizes_with_playbook_override() { + // PR B: a late corroborator can flip corroboration_met=true on its + // own path, using the override resolved from the group's stored + // playbook_name. We set up a group whose primary event lands below + // the global threshold but above the playbook override threshold, + // and confirm a corroborator promotes the flag. + use prefixd::correlation::{ + MatchDimension, PlaybookCorrelationOverride, SourceConfig, SourceMode, + }; + use prefixd::domain::AttackVector; + + let repo: Arc = Arc::new(MockRepository::new()); + let announcer = Arc::new(MockAnnouncer::new()); + + // Global config: min_sources=3, threshold=0.9 — primary alone is far + // from meeting it. The playbook override drops both to 2 / 0.5, so a + // single corroborator (count → 2) should finalize. + let mut settings = test_settings_with_correlation(true, 3, 0.9); + settings.correlation.sources.insert( + "router-cpu".to_string(), + SourceConfig { + weight: 0.6, + r#type: "telemetry".to_string(), + confidence_mapping: std::collections::HashMap::new(), + mode: SourceMode::Corroborating, + match_dimensions: vec![MatchDimension::Pop], + }, + ); + + let playbooks = Playbooks { + playbooks: vec![Playbook { + name: "udp_flood_test".to_string(), + match_criteria: PlaybookMatch { + vector: AttackVector::UdpFlood, + require_top_ports: false, + }, + correlation: Some(PlaybookCorrelationOverride { + min_sources: Some(2), + confidence_threshold: Some(0.5), + }), + steps: vec![PlaybookStep { + action: PlaybookAction::Police, + rate_bps: Some(5_000_000), + ttl_seconds: 120, + require_confidence_at_least: None, + require_persistence_seconds: None, + }], + }], + }; + + let state = AppState::new( + settings, + test_inventory(), + playbooks, + repo.clone(), + announcer, + std::path::PathBuf::from("."), + ) + .expect("state"); + let app = create_test_router(state); + + // Step 1: primary event — group exists, playbook_name is set, + // corroboration_met=false (single source against override min=2). + let event_body = serde_json::json!({ + "source": "fastnetmon", + "vector": "udp_flood", + "victim_ip": "203.0.113.10", + "timestamp": chrono::Utc::now().to_rfc3339(), + "confidence": 0.7, + "action": "ban" + }); + let resp = app + .clone() + .oneshot( + Request::builder() + .method("POST") + .uri("/v1/events") + .header("content-type", "application/json") + .body(Body::from(event_body.to_string())) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::ACCEPTED); + + let groups_before = repo + .list_signal_groups( + &prefixd::correlation::engine::SignalGroupFilter::default(), + &prefixd::db::ListParams { + limit: 10, + ..Default::default() + }, + ) + .await + .unwrap(); + assert_eq!(groups_before.len(), 1); + let group = &groups_before[0]; + assert_eq!(group.playbook_name.as_deref(), Some("udp_flood_test")); + assert!( + !group.corroboration_met, + "single primary event should not yet meet override threshold" + ); + + // Step 2: corroborator with matching pop → group now has 2 distinct + // sources, derived_confidence above 0.5, override allows promotion. + let sig_body = serde_json::json!({ + "source": "router-cpu", + "pop": "test1", + "vector": "udp_flood", + "confidence": 0.6 + }); + let resp2 = app + .oneshot( + Request::builder() + .method("POST") + .uri("/v1/signals/corroborator") + .header("content-type", "application/json") + .body(Body::from(sig_body.to_string())) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(resp2.status(), StatusCode::OK); + + let groups_after = repo + .list_signal_groups( + &prefixd::correlation::engine::SignalGroupFilter::default(), + &prefixd::db::ListParams { + limit: 10, + ..Default::default() + }, + ) + .await + .unwrap(); + let group_after = &groups_after[0]; + assert!( + group_after.corroboration_met, + "late corroborator should finalize via playbook override" + ); + assert_eq!(group_after.source_count, 2); +} + +#[tokio::test] +async fn test_late_corroborator_skips_when_playbook_name_is_stale() { + // If a group's stored playbook_name no longer resolves (admin removed + // it), the corroborator path falls back to conservative behavior: + // aggregates update but corroboration_met is preserved (not flipped). + use prefixd::correlation::CorroboratingSignal; + use prefixd::correlation::engine::{PrimaryDimensions, SignalGroup, SignalGroupStatus}; + use uuid::Uuid; + + let repo = MockRepository::new(); + let now = chrono::Utc::now(); + let group_id = Uuid::new_v4(); + + repo.insert_signal_group(&SignalGroup { + group_id, + victim_ip: "203.0.113.10".to_string(), + vector: "udp_flood".to_string(), + created_at: now, + window_expires_at: now + chrono::Duration::seconds(300), + derived_confidence: 0.0, + source_count: 0, + status: SignalGroupStatus::Open, + corroboration_met: false, + primary_dimensions: { + let mut d = PrimaryDimensions::default(); + d.add_pop("test1".to_string()); + d + }, + playbook_name: Some("does_not_exist".to_string()), + }) + .await + .unwrap(); + // Seed a primary event link so has_primary=true. + repo.add_event_to_group(group_id, Uuid::new_v4(), 1.0) + .await + .unwrap(); + repo.insert_corroborating_signal(&CorroboratingSignal { + signal_id: Uuid::new_v4(), + source: "router-cpu".to_string(), + vector: Some("udp_flood".to_string()), + customer_id: None, + pop: Some("test1".to_string()), + service_id: None, + interface: None, + confidence: Some(0.9), + weight: 1.0, + ingested_at: now, + expires_at: now + chrono::Duration::seconds(300), + raw_details: None, + attached_group_ids: vec![], + }) + .await + .unwrap(); + + let group = repo.get_signal_group(group_id).await.unwrap().unwrap(); + assert!( + !group.corroboration_met, + "stale playbook_name should not allow promotion" + ); +} From dc5cf6556fb15c8bc43375146b2b4c5457788b48 Mon Sep 17 00:00:00 2001 From: Lance Tuller Date: Wed, 29 Apr 2026 14:16:38 -0400 Subject: [PATCH 2/6] refactor(metrics): per-source corroborator expiry + cache_size gauge MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR B items 2 + 5. Restores attribution on the expired counter and adds a new gauge for runaway-cache alerting. Repository: - CorroboratorSweepStats.unattached_expired is now HashMap (per-source) instead of a flat u64. Helper CorroboratorSweepStats::unattached_total() preserves the old summing use case for tests/log lines. - Postgres delete_expired_corroborating_signals uses a single DELETE-RETURNING with GROUP BY source, so attribution is collected in the same query that performs the delete (no read-then-delete race window). - Mock implementation aggregates by source while retain()-ing. - New trait method count_cached_corroborators_by_source(now) returns Vec<(source, count)> for the gauge refresh; Postgres GROUPs in SQL, Mock buckets in-memory. Metrics: - CORROBORATOR_EXPIRED_TOTAL regains its &['source'] label set. Operator note: this is a label change in CHANGELOG; existing PromQL queries against the v0.16.0 unlabelled counter must add a sum() if they want the previous shape. - New gauge prefixd_corroborator_cache_size{source} registered and Lazy::force-d in init_metrics. Scheduler: - ReconciliationLoop carries last_cache_sources (tokio Mutex) to zero-out gauges for sources that drained between ticks; otherwise Prometheus would keep stale non-zero values forever. - After each sweep, sets per-source CORROBORATOR_CACHE_SIZE from count_cached_corroborators_by_source(now) and explicitly clears sources that fell out of the live set. Tests: - test_expired_sweep_attributes_per_source: 2 expired-unattached from router-cpu, 1 from pop-utilization → unattached_expired map keyed correctly. - test_count_cached_corroborators_by_source: covers the four exclusion cases (attached, expired, unattached-unexpired, two sources) and asserts the live counts. - Existing test_expired_sweep_splits_attached_vs_unattached updated to use the HashMap shape via .unattached_total() / .get(). cargo fmt + cargo clippy --all-targets clean. 222 unit + 125 integration + 16 postgres tests green. --- src/db/mock.rs | 20 ++++++- src/db/repository.rs | 44 +++++++++++---- src/db/traits.rs | 31 ++++++++--- src/observability/metrics.rs | 16 +++++- src/scheduler/reconcile.rs | 50 ++++++++++++----- tests/integration.rs | 101 ++++++++++++++++++++++++++++++++++- 6 files changed, 228 insertions(+), 34 deletions(-) diff --git a/src/db/mock.rs b/src/db/mock.rs index e236eea..25fecc4 100644 --- a/src/db/mock.rs +++ b/src/db/mock.rs @@ -791,14 +791,15 @@ impl RepositoryTrait for MockRepository { now: chrono::DateTime, ) -> Result { let mut cache = self.corroborating_signals.lock().unwrap(); - let mut unattached = 0u64; + let mut unattached: std::collections::HashMap = + std::collections::HashMap::new(); let mut attached = 0u64; cache.retain(|s| { if s.expires_at > now { return true; } if s.attached_group_ids.is_empty() { - unattached += 1; + *unattached.entry(s.source.clone()).or_insert(0) += 1; } else { attached += 1; } @@ -832,6 +833,21 @@ impl RepositoryTrait for MockRepository { .collect()) } + async fn count_cached_corroborators_by_source( + &self, + now: chrono::DateTime, + ) -> Result> { + use std::collections::HashMap; + let cache = self.corroborating_signals.lock().unwrap(); + let mut counts: HashMap = HashMap::new(); + for s in cache.iter() { + if s.expires_at > now && s.attached_group_ids.is_empty() { + *counts.entry(s.source.clone()).or_insert(0) += 1; + } + } + Ok(counts.into_iter().collect()) + } + async fn corroborator_source_activity( &self, since: chrono::DateTime, diff --git a/src/db/repository.rs b/src/db/repository.rs index ae13403..47c37c7 100644 --- a/src/db/repository.rs +++ b/src/db/repository.rs @@ -1337,24 +1337,22 @@ impl RepositoryTrait for Repository { &self, now: chrono::DateTime, ) -> Result { - // Two statements so the scheduler can attribute the expired metric - // to truly-unattached signals (cache misses) while still cleaning - // attached audit rows. Both run inside the same request round-trip - // order but we don't need a transaction: the counters are - // monotonic and the delete predicate is narrow. - let unattached: (i64,) = sqlx::query_as( + // Two statements: one DELETE-RETURNING grouped by source for + // per-source attribution on the expired metric; another for + // attached rows (no source attribution — they aren't cache misses). + let unattached_rows: Vec<(String, i64)> = sqlx::query_as( r#" WITH deleted AS ( DELETE FROM corroborating_signals WHERE expires_at <= $1 AND cardinality(attached_group_ids) = 0 - RETURNING signal_id + RETURNING source ) - SELECT COUNT(*) FROM deleted + SELECT source, COUNT(*)::BIGINT AS n FROM deleted GROUP BY source "#, ) .bind(now) - .fetch_one(&self.pool) + .fetch_all(&self.pool) .await?; let attached: (i64,) = sqlx::query_as( r#" @@ -1370,8 +1368,12 @@ impl RepositoryTrait for Repository { .bind(now) .fetch_one(&self.pool) .await?; + let unattached_expired = unattached_rows + .into_iter() + .map(|(source, n)| (source, n.max(0) as u64)) + .collect(); Ok(crate::db::traits::CorroboratorSweepStats { - unattached_expired: unattached.0.max(0) as u64, + unattached_expired, attached_expired: attached.0.max(0) as u64, }) } @@ -1409,6 +1411,28 @@ impl RepositoryTrait for Repository { Ok(rows.into_iter().map(Into::into).collect()) } + async fn count_cached_corroborators_by_source( + &self, + now: DateTime, + ) -> Result> { + let rows: Vec<(String, i64)> = sqlx::query_as( + r#" + SELECT source, COUNT(*)::BIGINT AS n + FROM corroborating_signals + WHERE expires_at > $1 + AND cardinality(attached_group_ids) = 0 + GROUP BY source + "#, + ) + .bind(now) + .fetch_all(&self.pool) + .await?; + Ok(rows + .into_iter() + .map(|(source, n)| (source, n.max(0) as u64)) + .collect()) + } + async fn corroborator_source_activity( &self, since: DateTime, diff --git a/src/db/traits.rs b/src/db/traits.rs index 4b4ab79..22f5e45 100644 --- a/src/db/traits.rs +++ b/src/db/traits.rs @@ -12,17 +12,25 @@ use super::{GlobalStats, PopInfo, SafelistEntry, TimeseriesBucket}; /// Return value for `delete_expired_corroborating_signals`. /// -/// `unattached_expired` is the count of signals the scheduler should -/// increment `CORROBORATOR_EXPIRED_TOTAL` by: signals that were cached -/// because no primary group matched at ingest and then timed out without -/// ever attaching. `attached_expired` rows are the audit copies retained -/// for late fan-out; their deletion is bookkeeping, not a cache miss. -#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)] +/// `unattached_expired` is per-source counts of signals the scheduler +/// should increment `CORROBORATOR_EXPIRED_TOTAL{source}` by: signals +/// that were cached because no primary group matched at ingest and +/// then timed out without ever attaching. `attached_expired` rows are +/// the audit copies retained for late fan-out; their deletion is +/// bookkeeping, not a cache miss, so we don't attribute them by +/// source. +#[derive(Debug, Clone, Default, PartialEq, Eq)] pub struct CorroboratorSweepStats { - pub unattached_expired: u64, + pub unattached_expired: std::collections::HashMap, pub attached_expired: u64, } +impl CorroboratorSweepStats { + pub fn unattached_total(&self) -> u64 { + self.unattached_expired.values().sum() + } +} + /// Per-source activity summary for the Signals dashboard, covering both /// primary-event sources (via the `events` table) and corroborator-only /// sources (via `corroborating_signals` + `signal_group_events`). @@ -261,4 +269,13 @@ pub trait RepositoryTrait: Send + Sync { &self, since: DateTime, ) -> Result>; + /// Per-source counts of currently-cached unattached, unexpired + /// corroborating signals. Used by the reconcile loop to set the + /// `prefixd_corroborator_cache_size{source}` gauge on each tick so + /// operators can alert on caches that grow without bound (e.g. + /// when a source posts but no matching primary event ever lands). + async fn count_cached_corroborators_by_source( + &self, + now: DateTime, + ) -> Result>; } diff --git a/src/observability/metrics.rs b/src/observability/metrics.rs index 4f7dd23..44c24e1 100644 --- a/src/observability/metrics.rs +++ b/src/observability/metrics.rs @@ -269,7 +269,20 @@ pub static CORROBORATOR_EXPIRED_TOTAL: Lazy = Lazy::new(|| { register_counter_vec!( "prefixd_corroborator_expired_total", "Corroborating signals removed by cache sweep without ever attaching", - &[] + &["source"] + ) + .unwrap() +}); + +/// Currently-cached, unattached, unexpired corroborating signals by source. +/// Updated by the reconcile loop every tick after the sweep. Operators can +/// alert on this gauge growing without bound (e.g. a source posting heavily +/// while no matching primary event ever lands). +pub static CORROBORATOR_CACHE_SIZE: Lazy = Lazy::new(|| { + register_gauge_vec!( + "prefixd_corroborator_cache_size", + "Currently-cached unattached corroborating signals, by source", + &["source"] ) .unwrap() }); @@ -315,6 +328,7 @@ pub fn init_metrics() { Lazy::force(&CORROBORATOR_INGESTED_TOTAL); Lazy::force(&CORROBORATOR_ATTACHED_TOTAL); Lazy::force(&CORROBORATOR_EXPIRED_TOTAL); + Lazy::force(&CORROBORATOR_CACHE_SIZE); } /// Update database pool metrics from sqlx pool stats diff --git a/src/scheduler/reconcile.rs b/src/scheduler/reconcile.rs index a9496c1..6609070 100644 --- a/src/scheduler/reconcile.rs +++ b/src/scheduler/reconcile.rs @@ -17,6 +17,11 @@ pub struct ReconciliationLoop { dry_run: bool, ws_broadcast: Option>, alerting: Option>>>, + /// Set of source labels we last set on + /// `CORROBORATOR_CACHE_SIZE`. Used to zero-out gauges when a + /// source's cache drains to empty between ticks (Prometheus would + /// otherwise keep the last non-zero value forever). + last_cache_sources: tokio::sync::Mutex>, } impl ReconciliationLoop { @@ -33,6 +38,7 @@ impl ReconciliationLoop { dry_run, ws_broadcast: None, alerting: None, + last_cache_sources: tokio::sync::Mutex::new(std::collections::HashSet::new()), } } @@ -203,30 +209,48 @@ impl ReconciliationLoop { async fn sweep_corroborator_cache(&self) -> anyhow::Result<()> { // Clean expired rows from the corroborating_signals cache. The - // repository splits the delete into unattached vs attached so we - // only charge `CORROBORATOR_EXPIRED_TOTAL` for true cache misses - // (ingested, never matched any group, timed out). Attached rows - // still get GC'd but don't inflate the expired counter — their - // audit trail already lives on signal_group_events. + // repository splits the delete into unattached (per-source) vs + // attached: only true cache misses (ingested, never matched any + // group, timed out) charge `CORROBORATOR_EXPIRED_TOTAL{source}`. + // Attached rows are GC'd silently — their audit trail already + // lives on signal_group_events. // - // Per-source attribution for the expired counter is deferred to - // PR B (see ROADMAP -> Correlation Engine -> Corroborating - // signals v2). + // After the sweep we also refresh `CORROBORATOR_CACHE_SIZE{source}` + // so operators can alert on caches growing without bound. let now = chrono::Utc::now(); let stats = self.repo.delete_expired_corroborating_signals(now).await?; - let total = stats.unattached_expired + stats.attached_expired; + let unattached_total = stats.unattached_total(); + let total = unattached_total + stats.attached_expired; if total > 0 { tracing::info!( - unattached_expired = stats.unattached_expired, + unattached_expired = unattached_total, attached_expired = stats.attached_expired, "swept expired corroborating signals from cache" ); - if stats.unattached_expired > 0 { + for (source, count) in &stats.unattached_expired { crate::observability::metrics::CORROBORATOR_EXPIRED_TOTAL - .with_label_values(&[] as &[&str]) - .inc_by(stats.unattached_expired as f64); + .with_label_values(&[source.as_str()]) + .inc_by(*count as f64); } } + + // Refresh the cache_size gauge: zero out previous values for + // sources that no longer have rows, then set per-source counts. + let by_source = self.repo.count_cached_corroborators_by_source(now).await?; + let live_sources: std::collections::HashSet = + by_source.iter().map(|(s, _)| s.clone()).collect(); + let mut last = self.last_cache_sources.lock().await; + for stale in last.difference(&live_sources) { + crate::observability::metrics::CORROBORATOR_CACHE_SIZE + .with_label_values(&[stale.as_str()]) + .set(0.0); + } + for (source, count) in &by_source { + crate::observability::metrics::CORROBORATOR_CACHE_SIZE + .with_label_values(&[source.as_str()]) + .set(*count as f64); + } + *last = live_sources; Ok(()) } diff --git a/tests/integration.rs b/tests/integration.rs index 211f181..7bb1cf2 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -5515,11 +5515,110 @@ async fn test_expired_sweep_splits_attached_vs_unattached() { .delete_expired_corroborating_signals(now) .await .unwrap(); - assert_eq!(stats.unattached_expired, 1); + assert_eq!(stats.unattached_total(), 1); + assert_eq!(stats.unattached_expired.get("router-cpu"), Some(&1)); assert_eq!(stats.attached_expired, 1); assert_eq!(repo.count_cached_corroborators(now).await.unwrap(), 1); } +#[tokio::test] +async fn test_expired_sweep_attributes_per_source() { + use chrono::{Duration, Utc}; + use prefixd::correlation::CorroboratingSignal; + use uuid::Uuid; + + let repo = MockRepository::new(); + let now = Utc::now(); + + // 2 expired unattached from router-cpu, 1 from pop-utilization + for src in ["router-cpu", "router-cpu", "pop-utilization"] { + repo.insert_corroborating_signal(&CorroboratingSignal { + signal_id: Uuid::new_v4(), + source: src.to_string(), + vector: None, + customer_id: None, + pop: Some("iad1".to_string()), + service_id: None, + interface: None, + confidence: Some(0.5), + weight: 0.5, + ingested_at: now - Duration::seconds(600), + expires_at: now - Duration::seconds(60), + raw_details: None, + attached_group_ids: vec![], + }) + .await + .unwrap(); + } + + let stats = repo + .delete_expired_corroborating_signals(now) + .await + .unwrap(); + assert_eq!(stats.unattached_expired.len(), 2); + assert_eq!(stats.unattached_expired.get("router-cpu"), Some(&2)); + assert_eq!(stats.unattached_expired.get("pop-utilization"), Some(&1)); + assert_eq!(stats.attached_expired, 0); +} + +#[tokio::test] +async fn test_count_cached_corroborators_by_source() { + use chrono::{Duration, Utc}; + use prefixd::correlation::CorroboratingSignal; + use uuid::Uuid; + + let repo = MockRepository::new(); + let now = Utc::now(); + + // 3 unattached unexpired from a, 1 unattached unexpired from b, + // 1 attached unexpired from a (excluded), 1 expired unattached from a + // (excluded). + let cases = [ + ("a", false, false, 0i64), // unexpired, unattached + ("a", false, false, 0), // unexpired, unattached + ("a", false, false, 0), // unexpired, unattached + ("b", false, false, 0), // unexpired, unattached + ("a", true, false, 0), // attached -> excluded + ("a", false, true, -3600i64), // expired -> excluded (insert with past expires_at) + ]; + for (src, attached, expired, ingest_offset) in cases { + repo.insert_corroborating_signal(&CorroboratingSignal { + signal_id: Uuid::new_v4(), + source: src.to_string(), + vector: None, + customer_id: None, + pop: Some("iad1".to_string()), + service_id: None, + interface: None, + confidence: Some(0.5), + weight: 0.5, + ingested_at: now + Duration::seconds(ingest_offset), + expires_at: if expired { + now - Duration::seconds(60) + } else { + now + Duration::seconds(300) + }, + raw_details: None, + attached_group_ids: if attached { + vec![Uuid::new_v4()] + } else { + vec![] + }, + }) + .await + .unwrap(); + } + + let counts: std::collections::HashMap = repo + .count_cached_corroborators_by_source(now) + .await + .unwrap() + .into_iter() + .collect(); + assert_eq!(counts.get("a"), Some(&3)); + assert_eq!(counts.get("b"), Some(&1)); +} + #[tokio::test] async fn test_corroborator_source_activity_merges_cache_rows() { use chrono::{Duration, Utc}; From b1ac8734914675477ef1f0f96ac34cae0721b6f7 Mon Sep 17 00:00:00 2001 From: Lance Tuller Date: Wed, 29 Apr 2026 14:21:57 -0400 Subject: [PATCH 3/6] feat(correlation): cached corroborator listing endpoint + dashboard panel PR B items 3 + 4. Surfaces the corroborator cache so L1 ops can debug mode=corroborating sources that ingest but never attach. Backend: - list_cached_corroborators trait method gains an optional source filter (Postgres pushes it down with a NULL-aware predicate; Mock filters in-memory). Used to build a per-source drilldown without overfetching. - New admin-only handler GET /v1/signals/corroborator/cache returns { now, total, by_source[], signals[] }. Listing limits to unattached + unexpired and clamps the page size to [1, 1000]. Auth flows through require_role(Admin); both unauthenticated and non-admin paths surface as 401 to match adjacent admin endpoints. - OpenAPI schemas added: CachedCorroboratorsResponse, CachedCorroboratorBySource. Route registered in api_routes(). Frontend: - New CachedCorroboratorsResponse / CachedCorroboratorSignal types in lib/api.ts; getCachedCorroborators() helper. - useCachedCorroborators hook with 30s SWR refresh. - New CacheTab component on the Correlation page (added between Groups and Config). Renders a 'cache size by source' badge row plus a dense table of cached signals with relative ingested/expires timestamps and dimension chips. Tests: - test_cache_listing_endpoint_returns_unattached_signals: posts one unattached + one attached row, asserts the attached row is filtered out and by_source counts match. - All 87 frontend Vitest cases still green; 'bun run build' compiles the new tab cleanly. cargo fmt + cargo clippy --all-targets clean. 222 unit + 126 integration + 16 postgres tests green. --- frontend/app/(dashboard)/correlation/page.tsx | 11 +- .../dashboard/correlation/cache-tab.tsx | 148 ++++++++++++++++++ frontend/hooks/use-api.ts | 22 +++ frontend/lib/api.ts | 40 +++++ src/api/handlers.rs | 84 ++++++++++ src/api/openapi.rs | 3 + src/api/routes.rs | 4 + src/db/mock.rs | 2 + src/db/repository.rs | 3 + src/db/traits.rs | 5 +- tests/integration.rs | 90 +++++++++++ 11 files changed, 410 insertions(+), 2 deletions(-) create mode 100644 frontend/components/dashboard/correlation/cache-tab.tsx diff --git a/frontend/app/(dashboard)/correlation/page.tsx b/frontend/app/(dashboard)/correlation/page.tsx index 92f9dae..d566a0c 100644 --- a/frontend/app/(dashboard)/correlation/page.tsx +++ b/frontend/app/(dashboard)/correlation/page.tsx @@ -4,11 +4,12 @@ import { useState } from "react" import { DashboardLayout } from "@/components/dashboard/dashboard-layout" import { Tabs, TabsContent, TabsList, TabsTrigger } from "@/components/ui/tabs" import { Badge } from "@/components/ui/badge" -import { Radio, Layers, Settings } from "lucide-react" +import { Radio, Layers, Settings, Database } from "lucide-react" import { useOpenSignalGroupCount } from "@/hooks/use-api" import { SignalsTab } from "@/components/dashboard/correlation/signals-tab" import { GroupsTab } from "@/components/dashboard/correlation/groups-tab" import { ConfigTab } from "@/components/dashboard/correlation/config-tab" +import { CacheTab } from "@/components/dashboard/correlation/cache-tab" export default function CorrelationPage() { const [activeTab, setActiveTab] = useState("signals") @@ -40,6 +41,10 @@ export default function CorrelationPage() { )} + + + Cache + Config @@ -54,6 +59,10 @@ export default function CorrelationPage() { + + + + diff --git a/frontend/components/dashboard/correlation/cache-tab.tsx b/frontend/components/dashboard/correlation/cache-tab.tsx new file mode 100644 index 0000000..4878579 --- /dev/null +++ b/frontend/components/dashboard/correlation/cache-tab.tsx @@ -0,0 +1,148 @@ +"use client" + +import { Card, CardContent, CardHeader, CardTitle } from "@/components/ui/card" +import { Badge } from "@/components/ui/badge" +import { Skeleton } from "@/components/ui/skeleton" +import { Database, AlertCircle } from "lucide-react" +import { useCachedCorroborators } from "@/hooks/use-api" +import { formatDistanceToNow } from "date-fns" + +function dimsLabel(s: { + customer_id: string | null + pop: string | null + service_id: string | null + interface: string | null +}): string { + const parts: string[] = [] + if (s.customer_id) parts.push(`customer=${s.customer_id}`) + if (s.pop) parts.push(`pop=${s.pop}`) + if (s.service_id) parts.push(`service=${s.service_id}`) + if (s.interface) parts.push(`iface=${s.interface}`) + return parts.length > 0 ? parts.join(" ") : "—" +} + +export function CacheTab() { + const { data, error, isLoading } = useCachedCorroborators({ limit: 200 }) + + if (isLoading) { + return ( +
+ + +
+ ) + } + + if (error) { + return ( + + + + Failed to load corroborator cache + + + ) + } + + if (!data || data.total === 0) { + return ( + + + +

+ No corroborator signals are currently cached. +

+

+ Cached signals appear here when a corroborating-only source posts before + any matching primary event lands. +

+
+
+ ) + } + + return ( +
+ + + + + Cache size by source + + + +
+ {data.by_source.map((b) => ( + + {b.source}: {b.count} + + ))} +
+

+ Total cached, unattached, unexpired: {data.total}. Watch the + prefixd_corroborator_cache_size + gauge to alert on caches growing without bound. +

+
+
+ + + + Cached signals + + +
+ + + + + + + + + + + + + {data.signals.map((s) => ( + + + + + + + + + ))} + +
SourceVectorDimensionsConfidenceIngestedExpires
{s.source} + {s.vector ?? "—"} + + {dimsLabel(s)} + + {s.confidence != null ? s.confidence.toFixed(2) : "—"} + + {formatDistanceToNow(new Date(s.ingested_at), { + addSuffix: true, + })} + + {formatDistanceToNow(new Date(s.expires_at), { + addSuffix: true, + })} +
+
+
+
+
+ ) +} diff --git a/frontend/hooks/use-api.ts b/frontend/hooks/use-api.ts index ac5e568..7c960e5 100644 --- a/frontend/hooks/use-api.ts +++ b/frontend/hooks/use-api.ts @@ -363,3 +363,25 @@ export function useOpenSignalGroupCount() { const { data } = useSignalGroups({ status: "open", limit: 1 }) return data?.count ?? 0 } + +export function useCachedCorroborators(params: { + source?: string + limit?: number +} = {}) { + const key = `cached-corroborators:${params.source ?? ""}:${params.limit ?? 100}` + return useSWR( + key, + MOCK_MODE + ? async () => ({ + now: new Date().toISOString(), + total: 0, + by_source: [], + signals: [], + } as Awaited>) + : () => api.getCachedCorroborators(params), + { + refreshInterval: 30_000, + revalidateOnFocus: !MOCK_MODE, + }, + ) +} diff --git a/frontend/lib/api.ts b/frontend/lib/api.ts index 1a3caa3..a602e1a 100644 --- a/frontend/lib/api.ts +++ b/frontend/lib/api.ts @@ -853,6 +853,46 @@ interface CorroboratorActivityResponse { sources: CorroboratorActivityEntry[] } +export interface CachedCorroboratorSignal { + signal_id: string + source: string + vector: string | null + customer_id: string | null + pop: string | null + service_id: string | null + interface: string | null + confidence: number | null + weight: number + ingested_at: string + expires_at: string + raw_details: unknown + attached_group_ids: string[] +} + +export interface CachedCorroboratorBySource { + source: string + count: number +} + +export interface CachedCorroboratorsResponse { + now: string + total: number + by_source: CachedCorroboratorBySource[] + signals: CachedCorroboratorSignal[] +} + +export async function getCachedCorroborators( + params: { source?: string; limit?: number } = {}, +): Promise { + const qs = new URLSearchParams() + if (params.source) qs.set("source", params.source) + if (params.limit) qs.set("limit", String(params.limit)) + const suffix = qs.toString() ? `?${qs}` : "" + return fetchApi( + `/v1/signals/corroborator/cache${suffix}`, + ) +} + export async function getSignalSources(): Promise { // Signal source status is derived from correlation config + recent // events + corroborator activity. Corroborating-only sources never hit diff --git a/src/api/handlers.rs b/src/api/handlers.rs index 2a021d5..0826295 100644 --- a/src/api/handlers.rs +++ b/src/api/handlers.rs @@ -5432,6 +5432,90 @@ pub async fn get_corroborator_activity( })) } +/// Cached-corroborators listing endpoint (PR B). Admin-only. Lists +/// signals currently in the corroborator cache that are unattached and +/// unexpired — i.e. waiting for a matching primary event to drain. +/// Useful for L1 ops to spot a source that's posting heavily but never +/// landing on a real incident. +#[derive(Debug, serde::Deserialize)] +pub struct CachedCorroboratorsQuery { + /// Page size; clamped to [1, 1000]. + #[serde(default)] + pub limit: Option, + /// Filter by signal source. Optional. + #[serde(default)] + pub source: Option, +} + +#[derive(Debug, serde::Serialize, utoipa::ToSchema)] +pub struct CachedCorroboratorsResponse { + pub now: chrono::DateTime, + pub total: u64, + pub by_source: Vec, + pub signals: Vec, +} + +#[derive(Debug, serde::Serialize, utoipa::ToSchema)] +pub struct CachedCorroboratorBySource { + pub source: String, + pub count: u64, +} + +#[utoipa::path( + get, + path = "/v1/signals/corroborator/cache", + tag = "signals", + params( + ("limit" = Option, Query, description = "Page size, default 100, max 1000"), + ("source" = Option, Query, description = "Filter by signal source"), + ), + responses( + (status = 200, description = "Cached corroborator listing", body = CachedCorroboratorsResponse), + (status = 401, description = "Authentication required"), + (status = 403, description = "Admin role required"), + ) +)] +pub async fn list_cached_corroborators_handler( + State(state): State>, + auth_session: AuthSession, + headers: HeaderMap, + axum::extract::Query(query): axum::extract::Query, +) -> impl IntoResponse { + use crate::domain::OperatorRole; + let auth_header = headers.get(AUTHORIZATION).and_then(|h| h.to_str().ok()); + if let Err(status) = require_role(&state, &auth_session, auth_header, OperatorRole::Admin) { + let msg = if status == StatusCode::UNAUTHORIZED { + "authentication required" + } else { + "admin role required" + }; + return Err(AppError(PrefixdError::Unauthorized(msg.into()))); + } + let limit = query.limit.unwrap_or(100).clamp(1, 1000) as i64; + let now = chrono::Utc::now(); + let signals = state + .repo + .list_cached_corroborators(now, limit, query.source.as_deref()) + .await + .map_err(AppError)?; + let by_source_rows = state + .repo + .count_cached_corroborators_by_source(now) + .await + .map_err(AppError)?; + let total = by_source_rows.iter().map(|(_, n)| *n).sum(); + let by_source = by_source_rows + .into_iter() + .map(|(source, count)| CachedCorroboratorBySource { source, count }) + .collect(); + Ok(Json(CachedCorroboratorsResponse { + now, + total, + by_source, + signals, + })) +} + /// Recompute a signal group's derived_confidence, source_count and /// corroboration_met flag from its events (including corroborators). /// diff --git a/src/api/openapi.rs b/src/api/openapi.rs index 0da833b..29f9e05 100644 --- a/src/api/openapi.rs +++ b/src/api/openapi.rs @@ -61,6 +61,7 @@ use crate::db::{GlobalStats, NotificationPreferences, PopInfo, PopStats, Safelis super::handlers::ingest_webhook, super::handlers::ingest_corroborator, super::handlers::get_corroborator_activity, + super::handlers::list_cached_corroborators_handler, super::handlers::get_correlation_config, super::handlers::update_correlation_config, ), @@ -117,6 +118,8 @@ use crate::db::{GlobalStats, NotificationPreferences, PopInfo, PopStats, Safelis super::handlers::CorroboratorResponse, super::handlers::CorroboratorActivityResponse, super::handlers::CorroboratorActivityEntry, + super::handlers::CachedCorroboratorsResponse, + super::handlers::CachedCorroboratorBySource, crate::correlation::PrimaryDimensions, crate::correlation::MatchDimension, crate::correlation::SourceMode, diff --git a/src/api/routes.rs b/src/api/routes.rs index 4e63c88..6ca4336 100644 --- a/src/api/routes.rs +++ b/src/api/routes.rs @@ -126,6 +126,10 @@ fn api_routes() -> Router> { "/v1/signals/corroborator/activity", axum::routing::get(handlers::get_corroborator_activity), ) + .route( + "/v1/signals/corroborator/cache", + axum::routing::get(handlers::list_cached_corroborators_handler), + ) } /// Common layers applied to both production and test routers diff --git a/src/db/mock.rs b/src/db/mock.rs index 25fecc4..c721263 100644 --- a/src/db/mock.rs +++ b/src/db/mock.rs @@ -823,11 +823,13 @@ impl RepositoryTrait for MockRepository { &self, now: chrono::DateTime, limit: i64, + source: Option<&str>, ) -> Result> { let cache = self.corroborating_signals.lock().unwrap(); Ok(cache .iter() .filter(|s| s.expires_at > now && s.attached_group_ids.is_empty()) + .filter(|s| source.is_none_or(|src| s.source == src)) .take(limit.max(0) as usize) .cloned() .collect()) diff --git a/src/db/repository.rs b/src/db/repository.rs index 47c37c7..0051acc 100644 --- a/src/db/repository.rs +++ b/src/db/repository.rs @@ -1392,6 +1392,7 @@ impl RepositoryTrait for Repository { &self, now: chrono::DateTime, limit: i64, + source: Option<&str>, ) -> Result> { let rows: Vec = sqlx::query_as( r#" @@ -1400,12 +1401,14 @@ impl RepositoryTrait for Repository { FROM corroborating_signals WHERE expires_at > $1 AND cardinality(attached_group_ids) = 0 + AND ($3::text IS NULL OR source = $3) ORDER BY ingested_at DESC LIMIT $2 "#, ) .bind(now) .bind(limit.max(0)) + .bind(source) .fetch_all(&self.pool) .await?; Ok(rows.into_iter().map(Into::into).collect()) diff --git a/src/db/traits.rs b/src/db/traits.rs index 22f5e45..1b64a0e 100644 --- a/src/db/traits.rs +++ b/src/db/traits.rs @@ -254,11 +254,14 @@ pub trait RepositoryTrait: Send + Sync { /// Count currently-cached (unattached, unexpired) corroborating signals. /// For operator dashboards / metrics. async fn count_cached_corroborators(&self, now: chrono::DateTime) -> Result; - /// List currently-cached corroborating signals for admin UI / debugging. + /// List currently-cached corroborating signals for admin UI / + /// debugging. When `source` is `Some`, results are filtered to + /// only that source. async fn list_cached_corroborators( &self, now: chrono::DateTime, limit: i64, + source: Option<&str>, ) -> Result>; /// Aggregate corroborator activity per source since `since`, across /// both the live cache (`corroborating_signals`) and attached rows diff --git a/tests/integration.rs b/tests/integration.rs index 7bb1cf2..4bc182b 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -5870,3 +5870,93 @@ async fn test_late_corroborator_skips_when_playbook_name_is_stale() { "stale playbook_name should not allow promotion" ); } + +#[tokio::test] +async fn test_cache_listing_endpoint_returns_unattached_signals() { + use chrono::{Duration, Utc}; + use prefixd::correlation::{CorroboratingSignal, MatchDimension, SourceConfig, SourceMode}; + use uuid::Uuid; + + let repo: Arc = Arc::new(MockRepository::new()); + let announcer = Arc::new(MockAnnouncer::new()); + let mut settings = test_settings_with_correlation(true, 1, 0.5); + settings.correlation.sources.insert( + "router-cpu".to_string(), + SourceConfig { + weight: 0.5, + r#type: "telemetry".to_string(), + confidence_mapping: std::collections::HashMap::new(), + mode: SourceMode::Corroborating, + match_dimensions: vec![MatchDimension::Pop], + }, + ); + + let now = Utc::now(); + repo.insert_corroborating_signal(&CorroboratingSignal { + signal_id: Uuid::new_v4(), + source: "router-cpu".to_string(), + vector: Some("udp_flood".to_string()), + customer_id: None, + pop: Some("iad1".to_string()), + service_id: None, + interface: None, + confidence: Some(0.5), + weight: 0.5, + ingested_at: now, + expires_at: now + Duration::seconds(300), + raw_details: None, + attached_group_ids: vec![], + }) + .await + .unwrap(); + // Attached row that should NOT show up in the listing. + repo.insert_corroborating_signal(&CorroboratingSignal { + signal_id: Uuid::new_v4(), + source: "router-cpu".to_string(), + vector: Some("udp_flood".to_string()), + customer_id: None, + pop: Some("iad1".to_string()), + service_id: None, + interface: None, + confidence: Some(0.5), + weight: 0.5, + ingested_at: now, + expires_at: now + Duration::seconds(300), + raw_details: None, + attached_group_ids: vec![Uuid::new_v4()], + }) + .await + .unwrap(); + + let state = AppState::new( + settings, + test_inventory(), + test_playbooks(), + repo, + announcer, + std::path::PathBuf::from("."), + ) + .expect("state"); + let app = create_test_router(state); + + let resp = app + .oneshot( + Request::builder() + .method("GET") + .uri("/v1/signals/corroborator/cache?limit=10") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + let bytes = axum::body::to_bytes(resp.into_body(), 1024 * 1024) + .await + .unwrap(); + let json: serde_json::Value = serde_json::from_slice(&bytes).unwrap(); + let signals = json["signals"].as_array().unwrap(); + assert_eq!(signals.len(), 1, "attached signal should be excluded"); + assert_eq!(json["total"], 1); + assert_eq!(json["by_source"][0]["source"], "router-cpu"); + assert_eq!(json["by_source"][0]["count"], 1); +} From 294f1dd7fbdfafd75a887bb6582d6828707dc3cd Mon Sep 17 00:00:00 2001 From: Lance Tuller Date: Wed, 29 Apr 2026 14:26:01 -0400 Subject: [PATCH 4/6] refactor(api)!: drop CorroboratorResponse.cached + PR B docs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR B item 4 + documentation pass for the full PR B set. API breaking change (v0.17.0): - CorroboratorResponse no longer carries a 'cached: bool' field; the always-true v0.16.0 boolean added no information given that status ∈ {attached, cached} is the canonical discriminator. - prefixdctl's CorroboratorResponse mirror struct also drops 'cached'. - frontend/lib/api.ts CorroboratorResponse type drops 'cached'. - Doc comment on CorroboratorResponse.status now points at the removal explicitly. Docs: - CHANGELOG: new 'Unreleased' entry covering all five PR B items as a coordinated set, with operator-facing notes for each (label change on _expired_total, new cache_size gauge, breaking field removal). - ROADMAP: 'Corroborating signals v2 (PR B)' block flipped to [x] with per-item descriptions of what landed. - ADR 021: 'Known limits / deferred to PR B' replaced with a 'PR B addenda (v0.17.0)' section that documents the playbook-name resolution path, the conservative fallback for stale playbooks, the per-source attribution implementation, the gauge-zeroing trick the scheduler uses, the new cache endpoint, and the field removal. - docs/api.md: example response no longer shows 'cached'; new callout explaining the breaking change. New '/v1/signals/corroborator/cache' section documents query params, response shape, and pairing with the cache_size gauge. - docs/detectors/corroborating-signals.md: example response updated; 'Known limits' section reframed as residual operator notes since the PR B items are no longer pending. - AGENTS.md: endpoint catalog gains the cache listing line. Verification: cargo fmt + cargo clippy --all-targets -- -D warnings clean. 222 unit + 126 integration + 16 postgres + 87 frontend tests green. 'bun run build' compiles. --- AGENTS.md | 1 + CHANGELOG.md | 9 ++++ ROADMAP.md | 12 ++--- docs/adr/021-corroborating-signals.md | 64 +++++++++++++++++++---- docs/api.md | 67 ++++++++++++++++++++++--- docs/detectors/corroborating-signals.md | 47 ++++++++++------- frontend/lib/api.ts | 1 - src/api/handlers.rs | 7 ++- src/bin/prefixdctl.rs | 2 - 9 files changed, 164 insertions(+), 46 deletions(-) diff --git a/AGENTS.md b/AGENTS.md index b3266cc..2de7c1b 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -167,6 +167,7 @@ See `docs/adr/` for all 19 Architecture Decision Records. - `POST /v1/signals/webhook/{name}` - Generic webhook adapter (configured in `correlation.yaml`; JSONPath field mapping; HMAC/bearer/none auth) - `POST /v1/signals/corroborator` - Corroborating signal adapter (ADR 021). Sources configured with `mode: corroborating` post dimension-tagged signals that strengthen open signal groups without ever triggering mitigations on their own. Declared `match_dimensions` are authoritative: only declared dimensions are consulted during matching. Rejected with 400 if the source is unknown, `mode: primary`, or no declared dimension is populated. Correlation engine must be enabled. - `GET /v1/signals/corroborator/activity?minutes=N` - Per-source corroborator activity summary aggregated across the live cache and attached signal-group rows. Used by the Signals dashboard so `mode: corroborating` sources surface realistic `last_seen`/`count` instead of always reading as "never seen". +- `GET /v1/signals/corroborator/cache?source=X&limit=N` - **Admin only.** Lists corroborating signals currently cached and unattached + unexpired (i.e. waiting for a matching primary event to arrive within `window_seconds`). Used by the Correlation dashboard's Cache tab; pair with the `prefixd_corroborator_cache_size{source}` gauge for alerting on runaway caches. - `GET /v1/config/correlation` - Correlation config (admin, secrets redacted) - `PUT /v1/config/correlation` - Update correlation config (admin only, writes YAML + hot-reload) diff --git a/CHANGELOG.md b/CHANGELOG.md index f65a116..fa7bccd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- **Corroborating signals v2 (PR B)** — Follow-up to ADR 021's initial ship that addresses the four review-deferred items as a coordinated set: + - **Playbook-aware late finalization.** Migration `012_signal_groups_playbook.sql` adds nullable `signal_groups.playbook_name`, populated by the daemon on the next primary event for each group via `COALESCE`. The corroborator-side aggregate recompute now re-resolves the playbook by name from live state and is allowed to flip `corroboration_met=true` using the override min_sources/threshold. Conservative fallback is preserved: a NULL or stale `playbook_name` still keeps the v0.16.0 no-flip behavior (the next primary event picks up the flag). + - **Per-source attribution on `prefixd_corroborator_expired_total`.** The counter regains its `{source}` label set, with `delete_expired_corroborating_signals` collecting attribution in the same `DELETE … RETURNING` query that performs the delete. **Operator note:** this is a label change. PromQL queries written against the v0.16.0 unlabelled counter must add a `sum()` to recover the previous shape. + - **New gauge `prefixd_corroborator_cache_size{source}`** updated by the reconcile loop after each sweep. Operators can alert on caches growing without bound (e.g. a source posting heavily while no matching primary event ever lands). Stale labels are explicitly zeroed when a source's cache drains between ticks. + - **Cached-corroborators admin endpoint and dashboard panel.** `GET /v1/signals/corroborator/cache` (admin-only) returns `{ now, total, by_source[], signals[] }` filtered to unattached + unexpired rows, with optional `?source=` and `?limit=` (clamped to 1..1000). New "Cache" tab on the Correlation page renders per-source counts plus a dense table of cached signals with relative ingested/expires timestamps and dimension chips. Backed by a new `useCachedCorroborators` SWR hook (30s refresh). + - **`CorroboratorResponse.cached` removed** in favor of the existing `status ∈ {attached, cached}` discriminator. The boolean was always `true` and added no information; status fully describes the outcome. Coordinated minor breaking change — bump to v0.17.0 — since the endpoint is new in this release line. + ## [0.16.0] - 2026-04-19 ### Added diff --git a/ROADMAP.md b/ROADMAP.md index 8300b6b..5817610 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -289,12 +289,12 @@ Example: FastNetMon says UDP flood at 0.6 confidence + router CPU spiking + host - [x] Corroboration requirements ("require 2+ sources") - [x] Correlation explainability (`why` details in API/UI for each mitigation decision) - [x] Corroborating-only signals v1 — coarse telemetry (router CPU, PoP interface, per-customer NetFlow) can strengthen groups without ever triggering mitigations on its own (ADR 021, PR #109) -- [ ] **Corroborating signals v2 (PR B)** — follow-ups from the ADR 021 review: - - [ ] Playbook-override-aware corroborator finalization: let a late corroborator promote `corroboration_met` → `true` on its own path, using the resolved override from the group's most recent primary event. Eliminates the current dependency on "another primary event fires within the window" for late corroborators to take effect. - - [ ] Per-source attribution on `prefixd_corroborator_expired_total`: select expiring rows grouped by source before delete so the metric can be labelled without a full rewrite of the sweep path. - - [ ] API cleanup: drop the redundant `cached: true` field from `CorroboratorResponse`; `status ∈ {attached, cached}` already fully describes the outcome. Coordinate with a minor API version bump since the endpoint is new in this release. - - [ ] Dashboard "cached corroborators" panel: small widget on the Correlation dashboard showing live count of unattached-but-unexpired corroborators per source, sourced from a new `/v1/signals/corroborator/cache` listing endpoint (admin-only). - - [ ] Gauge metric `prefixd_corroborator_cache_size{source}` updated by the reconcile loop for Prometheus alerting on runaway caches. +- [x] **Corroborating signals v2 (PR B)** — follow-ups from the ADR 021 review (shipped in v0.17.0): + - [x] Playbook-override-aware corroborator finalization: late corroborators now promote `corroboration_met` → `true` on their own path using the override resolved from `signal_groups.playbook_name` (migration 012). Conservative fallback preserved when the stored playbook is missing from live config. + - [x] Per-source attribution on `prefixd_corroborator_expired_total{source}`: counter regains `&["source"]` label set; sweep collects attribution via `DELETE … RETURNING` grouped by source. + - [x] API cleanup: dropped the redundant `cached: true` field from `CorroboratorResponse`; `status ∈ {attached, cached}` is the discriminator. + - [x] Dashboard "cached corroborators" panel: new Cache tab on the Correlation page backed by `GET /v1/signals/corroborator/cache` (admin-only) showing per-source counts and a dense signal table. + - [x] Gauge metric `prefixd_corroborator_cache_size{source}` updated by the reconcile loop after each sweep; stale labels zeroed on drain. - [ ] Router telemetry adapter (JTI, gNMI) as the first production consumer of corroborator mode (already listed under Signal Adapters) - [ ] Replay mode for tuning (simulate historical incidents without announcing FlowSpec rules) diff --git a/docs/adr/021-corroborating-signals.md b/docs/adr/021-corroborating-signals.md index 40dd6dd..90d1ee6 100644 --- a/docs/adr/021-corroborating-signals.md +++ b/docs/adr/021-corroborating-signals.md @@ -230,17 +230,59 @@ initial implementation. All were fixed before merge. Notable ones: ## Known limits / deferred to PR B -- A corroborator that lands late in the window and pushes a group over - its threshold does not immediately fire the mitigation; it waits for - the next primary-path event (if any) to re-evaluate. This is a - product choice, not a correctness gap — implementing playbook- - override-aware finalization on the corroborator path is scheduled for - PR B (see ROADMAP). -- `prefixd_corroborator_expired_total` has no source label. PR B will - restore per-source attribution by collecting rows before delete. -- `CorroboratorResponse.cached` is always `true`; the field is - redundant given `status ∈ {attached, cached}` and is flagged for - removal in PR B. +(All five PR B items shipped in v0.17.0 — see CHANGELOG and the +"PR B addenda" section below for design notes on each.) + +## PR B addenda (v0.17.0) + +- **Late corroborator finalization is now playbook-aware.** Migration + `012_signal_groups_playbook.sql` adds nullable + `signal_groups.playbook_name`. The daemon writes it on group create + and `COALESCE`-backfills it on the next primary event for any + pre-upgrade group. The corroborator-side aggregate recompute + re-resolves the playbook by name from live state; if the playbook + exists, it applies the override min_sources/threshold and is now + allowed to flip `corroboration_met` from false → true. If the stored + name is NULL or no longer resolves (admin removed the playbook), the + conservative v0.16.0 behavior is preserved: aggregates update but the + flag is not flipped — the next primary event picks it up. This + decouples late corroborator finalization from "another primary event + must fire within the window" without weakening the + primary-required-once invariant. + + Mitigation actuation deliberately stays single-sourced through + `handle_ban`; corroborator-path recompute updates state but does not + fire FlowSpec. The next primary event reads the now-true flag and + triggers normally. This keeps the actuation surface narrow and + preserves the existing test/audit surface for mitigations. + +- **Per-source attribution on the expired counter.** + `delete_expired_corroborating_signals` now uses a single + `DELETE … RETURNING source` with `GROUP BY source` so attribution is + collected in the same query that performs the delete (no + read-then-delete race). `CORROBORATOR_EXPIRED_TOTAL` regains its + `&["source"]` label set. Operator note: this is a label change. + PromQL queries written against the v0.16.0 unlabelled counter must + add `sum()` to recover the previous shape. + +- **`prefixd_corroborator_cache_size{source}` gauge.** Updated by the + reconcile loop after each sweep using the new + `count_cached_corroborators_by_source(now)` repository method. The + scheduler keeps an in-process `last_cache_sources` set so labels + whose source drained between ticks are explicitly zeroed (Prometheus + would otherwise keep stale non-zero values forever). + +- **`/v1/signals/corroborator/cache` admin endpoint.** Returns + `{ now, total, by_source[], signals[] }` filtered to + unattached + unexpired rows. Optional `?source=` and `?limit=` + (clamped to 1..1000). Dashboard-side, the new Cache tab on the + Correlation page renders per-source badges and a dense table of + cached signals. + +- **`CorroboratorResponse.cached` removed.** Always-true booleans add + no information. `status ∈ {attached, cached}` is the canonical + discriminator. v0.17.0 minor breaking change since the endpoint is + new in this release line. ## References diff --git a/docs/api.md b/docs/api.md index 26e2806..ccb63c1 100644 --- a/docs/api.md +++ b/docs/api.md @@ -946,8 +946,7 @@ Content-Type: application/json { "signal_id": "a4f1b2c3-…", "status": "attached", - "attached_group_ids": ["e2b9…-1f3c"], - "cached": true + "attached_group_ids": ["e2b9…-1f3c"] } ``` @@ -957,10 +956,11 @@ Content-Type: application/json primary event arrival. - `attached_group_ids` — UUIDs of signal groups this signal contributed to. -- `cached` — currently always `true` (signals are retained in the cache - after attach for late fan-out). This field is flagged for removal in - a follow-up release in favor of `status` alone; new integrations - should rely on `status` / `attached_group_ids` instead. + +> **v0.17.0 breaking change:** the always-true `cached` field on this +> response was dropped. `status ∈ {attached, cached}` is the canonical +> discriminator. Update integrations that read `cached` to read +> `status === "cached"` instead. **Error responses:** @@ -1008,6 +1008,61 @@ group and kept its live cache row for late fan-out; the intent is --- +### List Cached Corroborators (admin) + +```http +GET /v1/signals/corroborator/cache?source=router-cpu&limit=200 +``` + +**Admin-only.** Lists corroborating signals currently in the cache that +are unattached and unexpired — i.e., signals that posted before any +matching primary event landed and are still waiting inside the +`window_seconds` TTL. Use this to debug a `mode: corroborating` source +that ingests heavily but never seems to attach. + +**Query parameters:** + +- `limit` *(optional, default 100, range 1–1000)* — Page size. +- `source` *(optional)* — Filter by signal source name. + +**Response:** + +```json +{ + "now": "2026-04-29T18:23:00Z", + "total": 17, + "by_source": [ + {"source": "router-cpu", "count": 11}, + {"source": "pop-utilization", "count": 6} + ], + "signals": [ + { + "signal_id": "...", + "source": "router-cpu", + "vector": "udp_flood", + "customer_id": null, + "pop": "iad1", + "service_id": null, + "interface": null, + "confidence": 0.7, + "weight": 0.6, + "ingested_at": "2026-04-29T18:22:55Z", + "expires_at": "2026-04-29T18:27:55Z", + "raw_details": {...}, + "attached_group_ids": [] + } + ] +} +``` + +- `total` and `by_source[]` summarize **unattached, unexpired** rows + globally (not just the page). +- `signals[]` is paginated by `limit` and ordered by `ingested_at` desc. +- Pair this with the new `prefixd_corroborator_cache_size{source}` + gauge for alerting on caches growing without bound. + +--- + ## Safelist ### List Safelist diff --git a/docs/detectors/corroborating-signals.md b/docs/detectors/corroborating-signals.md index ea5ae83..78dc5b9 100644 --- a/docs/detectors/corroborating-signals.md +++ b/docs/detectors/corroborating-signals.md @@ -82,16 +82,18 @@ Response: { "signal_id": "…", "status": "attached", - "attached_group_ids": ["…"], - "cached": true + "attached_group_ids": ["…"] } ``` -- `attached` — at least one open signal group matched and was +- `status: attached` — at least one open signal group matched and was strengthened. -- `cached` — no matching group yet; the signal is held for up to - `correlation.window_seconds` and drained if a matching primary event - arrives in that window. +- `status: cached` — no matching group yet; the signal is held for up + to `correlation.window_seconds` and drained if a matching primary + event arrives in that window. + +> The v0.16.0 `cached: true` field on this response was always-true +> and was removed in v0.17.0. Branch on `status` instead. ## 3. From the CLI @@ -166,18 +168,27 @@ def post_cpu_alert(pop: str, utilization: float): ## 7. Known limits -See the ADR 021 "Known limits / deferred to PR B" section for the -authoritative list. Most operator-visible one right now: - -- A corroborator that arrives *after* a primary event and pushes - aggregates past the threshold does **not** immediately trigger the - mitigation. Finalization happens on the next primary-ingest path. In - practice this means: if two primary events fire inside the window, - the corroborator's contribution is picked up. If only one primary - fires and a corroborator arrives later, the mitigation waits on - another primary signal (or on the cache drain when the next primary - for the same dimensions lands). Tracked as PR B work item - *Playbook-override-aware corroborator finalization*. +All five PR B follow-ups from the ADR 021 review shipped in v0.17.0. +A few residual operator notes: + +- **Mitigation still actuates from the primary path.** A late + corroborator can flip `corroboration_met` to true on its own (using + the playbook override stored on the group), but the FlowSpec + announcement is still triggered by the next primary event for that + group. This keeps actuation single-sourced. In practice: the flag + promotion is observable on `GET /v1/signal-groups/{id}` and on the + dashboard immediately; the mitigation lands when a matching primary + ingest happens (which, given the corroborator just confirmed the + group, is usually within seconds). +- **Stale playbook names are conservative.** If an operator removed a + playbook between group creation and a late corroborator landing, + recompute will keep the v0.16.0 no-flip behavior. Re-add the + playbook (or wait for the next primary event with a fresh playbook + name) to recover. +- **Cache-size gauge is per-source.** Alert on + `prefixd_corroborator_cache_size{source="..."}` rather than a sum + to detect a single misconfigured source filling the cache without + affecting the global view. ## Troubleshooting diff --git a/frontend/lib/api.ts b/frontend/lib/api.ts index a602e1a..4accd96 100644 --- a/frontend/lib/api.ts +++ b/frontend/lib/api.ts @@ -760,7 +760,6 @@ export interface CorroboratorResponse { signal_id: string status: "attached" | "cached" attached_group_ids: string[] - cached: boolean } export async function sendCorroborator( diff --git a/src/api/handlers.rs b/src/api/handlers.rs index 0826295..a038105 100644 --- a/src/api/handlers.rs +++ b/src/api/handlers.rs @@ -5162,9 +5162,13 @@ pub struct CorroboratorInput { #[derive(Debug, Serialize, ToSchema)] pub struct CorroboratorResponse { pub signal_id: Uuid, + /// One of "attached" (one or more groups matched) or "cached" + /// (no matching group; held in the corroborator cache until a + /// matching primary event arrives or TTL expires). Use this + /// instead of the v0.16.0 `cached` boolean field, which is + /// removed in v0.17.0. pub status: String, pub attached_group_ids: Vec, - pub cached: bool, } /// Ingest a corroborating signal. @@ -5357,7 +5361,6 @@ async fn ingest_corroborator_inner( "attached".to_string() }, attached_group_ids, - cached: true, }), )) } diff --git a/src/bin/prefixdctl.rs b/src/bin/prefixdctl.rs index 4dcae07..61d231d 100644 --- a/src/bin/prefixdctl.rs +++ b/src/bin/prefixdctl.rs @@ -649,8 +649,6 @@ struct CorroboratorResponse { signal_id: String, status: String, attached_group_ids: Vec, - #[serde(default)] - cached: bool, } #[allow(clippy::too_many_arguments)] From 42242bed4a5da919ab71fde17fabe59ab2541590 Mon Sep 17 00:00:00 2001 From: Lance Tuller Date: Wed, 29 Apr 2026 14:32:36 -0400 Subject: [PATCH 5/6] chore(security): patch RUSTSEC advisories blocking CI CI's Security Audit job was failing on three new RustSec advisories that landed against our dependency tree since v0.16.0. All are fixable via lock or trivial dev-dependency bumps; no production consumer code changes required. Fixed advisories: - RUSTSEC-2026-0104 (rustls-webpki 0.103.12 reachable panic in CRL parsing): `cargo update -p rustls-webpki` -> 0.103.13. Lock-only. - RUSTSEC-2026-0066, RUSTSEC-2026-0112, RUSTSEC-2026-0113 (astral-tokio-tar 0.5.6 PAX header desync, validation gaps, unpack_in symlink chmod): bump testcontainers 0.26 -> 0.27 and testcontainers-modules 0.14 -> 0.15 (dev-deps only). Pulls in astral-tokio-tar 0.6.x via the new release. No test changes. Also rolled in safe patch/minor lock-only bumps for actively maintained primary deps: - axum 0.8.8 -> 0.8.9 (patch) - axum-macros 0.5.0 -> 0.5.1 (patch) - tokio 1.51.0 -> 1.52.1 (minor) - uuid 1.22.0 -> 1.23.1 (minor) - tokio-tungstenite/tungstenite 0.28 -> 0.29 (transitive) Skipped on this PR (saved for follow-up where they need real review): - password-hash 0.5 -> 0.6 (touches crypto code) - sha2 0.10 -> 0.11 (transitive break risk) - vite 7 -> 8 / vitejs-plugin-react 5 -> 6 (frontend major) - typescript 5 -> 6 (frontend major) - jsdom 28 -> 29 (frontend major) - lucide-react 0.575 -> 1.x (frontend major; icon API changes) CI ignore list pruned: removed RUSTSEC-2025-0111 (tokio-tar) and RUSTSEC-2026-0066 (astral-tokio-tar) since both are now resolved upstream. Kept RUSTSEC-2023-0071 (rsa via unused sqlx-mysql), RUSTSEC-2025-0134 (rustls-pemfile unmaintained, no fix yet), and RUSTSEC-2026-0097 (rand custom-logger unsoundness, no fix; we don't use a custom logger). Verification: cargo audit clean modulo the three documented ignores; cargo fmt + cargo clippy --all-targets -- -D warnings clean; 222 unit + 126 integration + 16 postgres tests green. --- .github/workflows/ci.yml | 4 +- Cargo.lock | 81 +++++++++++++++++++--------------------- Cargo.toml | 4 +- 3 files changed, 42 insertions(+), 47 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7fafc44..6a5d754 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -113,11 +113,9 @@ jobs: uses: actions-rust-lang/audit@v1 with: # RUSTSEC-2023-0071: rsa crate (from sqlx-mysql, not used - we use postgres only) - # RUSTSEC-2025-0111: tokio-tar (from testcontainers, dev dependency only) - # RUSTSEC-2026-0066: astral-tokio-tar (renamed tokio-tar, same issue, testcontainers dev dep) # RUSTSEC-2025-0134: rustls-pemfile unmaintained (transitive dep, awaiting upstream fix) # RUSTSEC-2026-0097: rand unsoundness with custom logger (no fix available; we don't use custom logger) - ignore: RUSTSEC-2023-0071,RUSTSEC-2025-0111,RUSTSEC-2026-0066,RUSTSEC-2025-0134,RUSTSEC-2026-0097 + ignore: RUSTSEC-2023-0071,RUSTSEC-2025-0134,RUSTSEC-2026-0097 denyWarnings: false createIssues: false diff --git a/Cargo.lock b/Cargo.lock index 975d2ee..199e399 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -77,7 +77,7 @@ version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.60.2", ] [[package]] @@ -88,7 +88,7 @@ checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d" dependencies = [ "anstyle", "once_cell_polyfill", - "windows-sys 0.61.2", + "windows-sys 0.60.2", ] [[package]] @@ -120,9 +120,9 @@ dependencies = [ [[package]] name = "astral-tokio-tar" -version = "0.5.6" +version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec179a06c1769b1e42e1e2cbe74c7dcdb3d6383c838454d063eaac5bbb7ebbe5" +checksum = "4ce73b17c62717c4b6a9af10b43e87c578b0cac27e00666d48304d3b7d2c0693" dependencies = [ "filetime", "futures-core", @@ -212,9 +212,9 @@ dependencies = [ [[package]] name = "axum" -version = "0.8.8" +version = "0.8.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b52af3cb4058c895d37317bb27508dccc8e5f2d39454016b297bf4a400597b8" +checksum = "31b698c5f9a010f6573133b09e0de5408834d0c82f8d7475a89fc1867a71cd90" dependencies = [ "axum-core", "axum-macros", @@ -287,9 +287,9 @@ dependencies = [ [[package]] name = "axum-macros" -version = "0.5.0" +version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "604fde5e028fea851ce1d8570bbdc034bec850d157f7569d10f347d06808c05c" +checksum = "7aa268c23bfbbd2c4363b9cd302a4f504fb2a9dfe7e3451d66f35dd392e20aca" dependencies = [ "proc-macro2", "quote", @@ -380,9 +380,9 @@ dependencies = [ [[package]] name = "bollard" -version = "0.19.4" +version = "0.20.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87a52479c9237eb04047ddb94788c41ca0d26eaff8b697ecfbb4c32f7fdc3b1b" +checksum = "ee04c4c84f1f811b017f2fbb7dd8815c976e7ca98593de9c1e2afad0f636bff4" dependencies = [ "async-stream", "base64 0.22.1", @@ -390,7 +390,6 @@ dependencies = [ "bollard-buildkit-proto", "bollard-stubs", "bytes", - "chrono", "futures-core", "futures-util", "hex", @@ -408,14 +407,13 @@ dependencies = [ "rand 0.9.2", "rustls", "rustls-native-certs", - "rustls-pemfile", "rustls-pki-types", "serde", "serde_derive", "serde_json", - "serde_repr", "serde_urlencoded", "thiserror 2.0.18", + "time", "tokio", "tokio-stream", "tokio-util", @@ -440,19 +438,18 @@ dependencies = [ [[package]] name = "bollard-stubs" -version = "1.49.1-rc.28.4.0" +version = "1.52.1-rc.29.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5731fe885755e92beff1950774068e0cae67ea6ec7587381536fca84f1779623" +checksum = "0f0a8ca8799131c1837d1282c3f81f31e76ceb0ce426e04a7fe1ccee3287c066" dependencies = [ "base64 0.22.1", "bollard-buildkit-proto", "bytes", - "chrono", "prost", "serde", "serde_json", "serde_repr", - "serde_with", + "time", ] [[package]] @@ -942,7 +939,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] @@ -985,12 +982,12 @@ checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" [[package]] name = "ferroid" -version = "0.8.9" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb330bbd4cb7a5b9f559427f06f98a4f853a137c8298f3bd3f8ca57663e21986" +checksum = "ee93edf3c501f0035bbeffeccfed0b79e14c311f12195ec0e661e114a0f60da4" dependencies = [ "portable-atomic", - "rand 0.9.2", + "rand 0.10.0", "web-time", ] @@ -1888,7 +1885,7 @@ version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.60.2", ] [[package]] @@ -2790,7 +2787,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] @@ -2858,7 +2855,7 @@ dependencies = [ "security-framework", "security-framework-sys", "webpki-root-certs", - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] @@ -2869,9 +2866,9 @@ checksum = "f87165f0995f63a9fbeea62b64d10b4d9d8e78ec6d7d51fb2125fda7bb36788f" [[package]] name = "rustls-webpki" -version = "0.103.12" +version = "0.103.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8279bb85272c9f10811ae6a6c547ff594d6a7f3c6c6b02ee9726d1d0dcfcdd06" +checksum = "61c429a8649f110dddef65e2a5ad240f747e85f7758a6bccc7e5777bd33f756e" dependencies = [ "aws-lc-rs", "ring", @@ -3230,7 +3227,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3a766e1110788c36f4fa1c2b71b387a7815aa65f88ce0229841826633d93723e" dependencies = [ "libc", - "windows-sys 0.61.2", + "windows-sys 0.60.2", ] [[package]] @@ -3566,14 +3563,14 @@ dependencies = [ "getrandom 0.4.1", "once_cell", "rustix", - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] name = "testcontainers" -version = "0.26.3" +version = "0.27.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a81ec0158db5fbb9831e09d1813fe5ea9023a2b5e6e8e0a5fe67e2a820733629" +checksum = "bfd5785b5483672915ed5fe3cddf9f546802779fc1eceff0a6fb7321fac81c1e" dependencies = [ "astral-tokio-tar", "async-trait", @@ -3584,6 +3581,7 @@ dependencies = [ "etcetera 0.11.0", "ferroid", "futures", + "http", "itertools 0.14.0", "log", "memchr", @@ -3601,9 +3599,9 @@ dependencies = [ [[package]] name = "testcontainers-modules" -version = "0.14.0" +version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5e75e78ff453128a2c7da9a5d5a3325ea34ea214d4bf51eab3417de23a4e5147" +checksum = "e5985fde5befe4ffa77a052e035e16c2da86e8bae301baa9f9904ad3c494d357" dependencies = [ "testcontainers", ] @@ -3725,9 +3723,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.51.0" +version = "1.52.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2bd1c4c0fc4a7ab90fc15ef6daaa3ec3b893f004f915f2392557ed23237820cd" +checksum = "b67dee974fe86fd92cc45b7a95fdd2f99a36a6d7b0d431a231178d3d670bbcc6" dependencies = [ "bytes", "libc", @@ -3785,9 +3783,9 @@ dependencies = [ [[package]] name = "tokio-tungstenite" -version = "0.28.0" +version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d25a406cddcc431a75d3d9afc6a7c0f7428d4891dd973e4d54c56b46127bf857" +checksum = "8f72a05e828585856dacd553fba484c242c46e391fb0e58917c942ee9202915c" dependencies = [ "futures-util", "log", @@ -4091,9 +4089,9 @@ checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" [[package]] name = "tungstenite" -version = "0.28.0" +version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8628dcc84e5a09eb3d8423d6cb682965dea9133204e8fb3efee74c2a0c259442" +checksum = "6c01152af293afb9c7c2a57e4b559c5620b421f6d133261c60dd2d0cdb38e6b8" dependencies = [ "bytes", "data-encoding", @@ -4103,7 +4101,6 @@ dependencies = [ "rand 0.9.2", "sha1", "thiserror 2.0.18", - "utf-8", ] [[package]] @@ -4261,9 +4258,9 @@ dependencies = [ [[package]] name = "uuid" -version = "1.22.0" +version = "1.23.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a68d3c8f01c0cfa54a75291d83601161799e4a89a39e0929f4b0354d88757a37" +checksum = "ddd74a9687298c6858e9b88ec8935ec45d22e8fd5e6394fa1bd4e99a87789c76" dependencies = [ "getrandom 0.4.1", "js-sys", @@ -4510,7 +4507,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.48.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index d44faf6..b01f074 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -84,8 +84,8 @@ prost-build = "0.14" tokio-test = "0.4" tempfile = "3" criterion = { version = "0.8", features = ["async_tokio"] } -testcontainers = "0.26" -testcontainers-modules = { version = "0.14", features = ["postgres"] } +testcontainers = "0.27" +testcontainers-modules = { version = "0.15", features = ["postgres"] } proptest = "1" [[bench]] From a21f4615ef1b0900710363c2ee0e97d6f536dd7c Mon Sep 17 00:00:00 2001 From: Lance Tuller Date: Wed, 29 Apr 2026 14:44:53 -0400 Subject: [PATCH 6/6] review: address Copilot PR #117 feedback MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Four valid review comments from copilot-pull-request-reviewer. 1. handlers.rs: list_cached_corroborators_handler returned 401 even on the role-mismatch (FORBIDDEN) path because both StatusCode values were funneled through PrefixdError::Unauthorized which always serializes as 401. Refactored to return Result directly (matching the existing list_operators / update_password admin-handler shape) so 403 actually reaches the wire. 2. handlers.rs: when ?source= was passed, only the signals[] array was filtered; total + by_source[] were still computed from the unfiltered global counts, so a client requesting source=foo would get rows for foo but a 'total' covering everything. Now the per-source aggregation rows are filtered to match the same source parameter; total is summed from the filtered set. Added regression test test_cache_listing_endpoint_source_filter_scopes_aggregates. 3. migrations/012: dropped the leftover WITH agg AS (...) UPDATE … WHERE 1 = 0 no-op block. The header originally implied a SQL backfill, but the actual approach is runtime COALESCE backfill in handle_ban; the dead CTE was just confusing. Header rewritten to describe the runtime-backfill approach explicitly and point at the call site. 4. integration.rs: test_late_corroborator_skips_when_playbook_name_is_stale previously inserted records directly into the mock repo and never exercised recompute_group_aggregates, so the assertion would have passed even with the stale-playbook guard removed. Rewritten to drive the flow through POST /v1/signals/corroborator with two distinct corroborating sources, asserting that aggregates do advance (source_count → 2) but corroboration_met stays false because the stored playbook name doesn't resolve. Verification: cargo fmt + cargo clippy --all-targets -- -D warnings clean. 222 unit + 127 integration + 16 postgres tests green. --- migrations/012_signal_groups_playbook.sql | 38 ++-- src/api/handlers.rs | 27 +-- tests/integration.rs | 200 ++++++++++++++++++---- 3 files changed, 199 insertions(+), 66 deletions(-) diff --git a/migrations/012_signal_groups_playbook.sql b/migrations/012_signal_groups_playbook.sql index 3149b18..0d0309d 100644 --- a/migrations/012_signal_groups_playbook.sql +++ b/migrations/012_signal_groups_playbook.sql @@ -11,36 +11,22 @@ -- writes it; the corroborator path looks it up and resolves the override -- against the live playbook config. -- --- Backfill: best-effort, copy the playbook name from any mitigation that --- was triggered by this group. If no mitigation exists yet (e.g. the --- group is below threshold and only has a single primary event), the --- column stays NULL and the corroborator recompute falls back to the --- v0.16.0 conservative behavior. The next primary event for the same --- group will fill it in. +-- Backfill is intentionally NOT done in SQL. `mitigations.match_json` +-- doesn't carry the resolved playbook name, and re-running the matcher +-- against a playbook YAML snapshot at migration time is brittle (the +-- live playbook list may have changed between when the group was +-- created and when this migration runs). +-- +-- Instead, the daemon backfills `playbook_name` at runtime using +-- `COALESCE(playbook_name, $resolved)` on the next primary-event ingest +-- path for each group that still has it NULL (see +-- `handle_ban` in `src/api/handlers.rs`). Until that next primary event +-- arrives, the corroborator-side recompute path falls back to the +-- v0.16.0 conservative behavior (no flip of `corroboration_met`). ALTER TABLE signal_groups ADD COLUMN IF NOT EXISTS playbook_name TEXT; -WITH agg AS ( - SELECT m.signal_group_id AS group_id, - -- All mitigations from one signal group should share a playbook - -- (groups are keyed by vector and playbooks fan out by vector), - -- so MIN() is safe and deterministic for the rare race where a - -- vector matched two playbooks. - MIN(m.match_json) AS sample_match_json - FROM mitigations m - WHERE m.signal_group_id IS NOT NULL - GROUP BY m.signal_group_id -) -UPDATE signal_groups sg -SET playbook_name = agg.group_id::text -- placeholder, replaced below -FROM agg -WHERE 1 = 0; --- The actual backfill happens at runtime: the daemon will resolve and --- populate `playbook_name` on the next primary event for any group that --- still has it NULL. Doing this in SQL is brittle because match_json --- doesn't carry a playbook reference; we'd be re-running the matcher. - CREATE INDEX IF NOT EXISTS idx_signal_groups_playbook ON signal_groups (playbook_name) WHERE playbook_name IS NOT NULL; diff --git a/src/api/handlers.rs b/src/api/handlers.rs index a038105..a3e667b 100644 --- a/src/api/handlers.rs +++ b/src/api/handlers.rs @@ -5483,29 +5483,34 @@ pub async fn list_cached_corroborators_handler( auth_session: AuthSession, headers: HeaderMap, axum::extract::Query(query): axum::extract::Query, -) -> impl IntoResponse { +) -> Result, StatusCode> { use crate::domain::OperatorRole; let auth_header = headers.get(AUTHORIZATION).and_then(|h| h.to_str().ok()); - if let Err(status) = require_role(&state, &auth_session, auth_header, OperatorRole::Admin) { - let msg = if status == StatusCode::UNAUTHORIZED { - "authentication required" - } else { - "admin role required" - }; - return Err(AppError(PrefixdError::Unauthorized(msg.into()))); - } + require_role(&state, &auth_session, auth_header, OperatorRole::Admin)?; + let limit = query.limit.unwrap_or(100).clamp(1, 1000) as i64; let now = chrono::Utc::now(); let signals = state .repo .list_cached_corroborators(now, limit, query.source.as_deref()) .await - .map_err(AppError)?; + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + // When ?source= is provided, scope total + by_source to that + // source as well so the response is internally consistent + // (otherwise clients see a `total` that doesn't match the rows + // they were just handed). let by_source_rows = state .repo .count_cached_corroborators_by_source(now) .await - .map_err(AppError)?; + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + let by_source_rows: Vec<(String, u64)> = match query.source.as_deref() { + Some(filter) => by_source_rows + .into_iter() + .filter(|(s, _)| s == filter) + .collect(), + None => by_source_rows, + }; let total = by_source_rows.iter().map(|(_, n)| *n).sum(); let by_source = by_source_rows .into_iter() diff --git a/tests/integration.rs b/tests/integration.rs index 4bc182b..10d8fcf 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -5813,24 +5813,57 @@ async fn test_late_corroborator_finalizes_with_playbook_override() { #[tokio::test] async fn test_late_corroborator_skips_when_playbook_name_is_stale() { // If a group's stored playbook_name no longer resolves (admin removed - // it), the corroborator path falls back to conservative behavior: - // aggregates update but corroboration_met is preserved (not flipped). - use prefixd::correlation::CorroboratingSignal; + // it from the live config), the corroborator path falls back to the + // conservative v0.16.0 behavior: aggregates update on each ingest + // but corroboration_met is *not* flipped from false -> true. We + // exercise this end-to-end by: + // 1) seeding an open group whose stored playbook name doesn't + // exist in the live playbook list, + // 2) posting a corroborator that *would* satisfy the override if + // the playbook resolved, and + // 3) asserting the flag stayed false even though source_count and + // derived_confidence advanced. + use chrono::{Duration, Utc}; use prefixd::correlation::engine::{PrimaryDimensions, SignalGroup, SignalGroupStatus}; + use prefixd::correlation::{MatchDimension, SourceConfig, SourceMode}; use uuid::Uuid; - let repo = MockRepository::new(); - let now = chrono::Utc::now(); - let group_id = Uuid::new_v4(); + let repo: Arc = Arc::new(MockRepository::new()); + let announcer = Arc::new(MockAnnouncer::new()); + // Global config that would never trigger on its own (min=3, 0.9). + // No playbook named `no_such_playbook` is configured, so the stored + // value won't resolve; conservative fallback should fire. + // + // We register two corroborating sources on the same dimension so a + // pair of POSTs gets us source_count=2 — enough to *would-be* + // satisfy a 2/0.5 override if the playbook resolved. + let mut settings = test_settings_with_correlation(true, 3, 0.9); + for src in ["router-cpu", "pop-utilization"] { + settings.correlation.sources.insert( + src.to_string(), + SourceConfig { + weight: 0.6, + r#type: "telemetry".to_string(), + confidence_mapping: std::collections::HashMap::new(), + mode: SourceMode::Corroborating, + match_dimensions: vec![MatchDimension::Pop], + }, + ); + } + + // Seed a group as if a primary event already created it pointing at + // a playbook that has since been removed from config. + let now = Utc::now(); + let group_id = Uuid::new_v4(); repo.insert_signal_group(&SignalGroup { group_id, victim_ip: "203.0.113.10".to_string(), vector: "udp_flood".to_string(), created_at: now, - window_expires_at: now + chrono::Duration::seconds(300), - derived_confidence: 0.0, - source_count: 0, + window_expires_at: now + Duration::seconds(300), + derived_confidence: 0.7, + source_count: 1, status: SignalGroupStatus::Open, corroboration_met: false, primary_dimensions: { @@ -5838,36 +5871,63 @@ async fn test_late_corroborator_skips_when_playbook_name_is_stale() { d.add_pop("test1".to_string()); d }, - playbook_name: Some("does_not_exist".to_string()), + playbook_name: Some("no_such_playbook".to_string()), }) .await .unwrap(); - // Seed a primary event link so has_primary=true. + // Make has_primary=true so the corroborator path is even allowed + // to consider the group. repo.add_event_to_group(group_id, Uuid::new_v4(), 1.0) .await .unwrap(); - repo.insert_corroborating_signal(&CorroboratingSignal { - signal_id: Uuid::new_v4(), - source: "router-cpu".to_string(), - vector: Some("udp_flood".to_string()), - customer_id: None, - pop: Some("test1".to_string()), - service_id: None, - interface: None, - confidence: Some(0.9), - weight: 1.0, - ingested_at: now, - expires_at: now + chrono::Duration::seconds(300), - raw_details: None, - attached_group_ids: vec![], - }) - .await - .unwrap(); + + let state = AppState::new( + settings, + test_inventory(), + test_playbooks(), // does NOT contain "no_such_playbook" + repo.clone(), + announcer, + std::path::PathBuf::from("."), + ) + .expect("state"); + let app = create_test_router(state); + + // POST two corroborators from distinct sources matching on pop. + // Without the stale-playbook guard, recompute_group_aggregates + // would resolve the override and flip corroboration_met to true + // (source_count reaches 2, derived_confidence above 0.5 override). + // With the guard, both ingests advance aggregates but the flag + // stays false. + for src in ["router-cpu", "pop-utilization"] { + let body = serde_json::json!({ + "source": src, + "pop": "test1", + "vector": "udp_flood", + "confidence": 0.9 + }); + let resp = app + .clone() + .oneshot( + Request::builder() + .method("POST") + .uri("/v1/signals/corroborator") + .header("content-type", "application/json") + .body(Body::from(body.to_string())) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + } let group = repo.get_signal_group(group_id).await.unwrap().unwrap(); + assert_eq!( + group.source_count, 2, + "aggregates should still update on the corroborator path" + ); assert!( !group.corroboration_met, - "stale playbook_name should not allow promotion" + "stale playbook_name must keep the conservative no-flip behavior" ); } @@ -5960,3 +6020,85 @@ async fn test_cache_listing_endpoint_returns_unattached_signals() { assert_eq!(json["by_source"][0]["source"], "router-cpu"); assert_eq!(json["by_source"][0]["count"], 1); } + +#[tokio::test] +async fn test_cache_listing_endpoint_source_filter_scopes_aggregates() { + // Regression: when `?source=` is passed, total + by_source must + // also be filtered to that source. Otherwise clients see a `total` + // that doesn't match the rows they were just handed. + use chrono::{Duration, Utc}; + use prefixd::correlation::{CorroboratingSignal, MatchDimension, SourceConfig, SourceMode}; + use uuid::Uuid; + + let repo: Arc = Arc::new(MockRepository::new()); + let announcer = Arc::new(MockAnnouncer::new()); + let mut settings = test_settings_with_correlation(true, 1, 0.5); + for src in ["router-cpu", "pop-utilization"] { + settings.correlation.sources.insert( + src.to_string(), + SourceConfig { + weight: 0.5, + r#type: "telemetry".to_string(), + confidence_mapping: std::collections::HashMap::new(), + mode: SourceMode::Corroborating, + match_dimensions: vec![MatchDimension::Pop], + }, + ); + } + + // Three rows: 2 router-cpu, 1 pop-utilization. All unattached + unexpired. + let now = Utc::now(); + for src in ["router-cpu", "router-cpu", "pop-utilization"] { + repo.insert_corroborating_signal(&CorroboratingSignal { + signal_id: Uuid::new_v4(), + source: src.to_string(), + vector: Some("udp_flood".to_string()), + customer_id: None, + pop: Some("iad1".to_string()), + service_id: None, + interface: None, + confidence: Some(0.5), + weight: 0.5, + ingested_at: now, + expires_at: now + Duration::seconds(300), + raw_details: None, + attached_group_ids: vec![], + }) + .await + .unwrap(); + } + + let state = AppState::new( + settings, + test_inventory(), + test_playbooks(), + repo, + announcer, + std::path::PathBuf::from("."), + ) + .expect("state"); + let app = create_test_router(state); + + let resp = app + .oneshot( + Request::builder() + .method("GET") + .uri("/v1/signals/corroborator/cache?source=pop-utilization&limit=10") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + let bytes = axum::body::to_bytes(resp.into_body(), 1024 * 1024) + .await + .unwrap(); + let json: serde_json::Value = serde_json::from_slice(&bytes).unwrap(); + let signals = json["signals"].as_array().unwrap(); + assert_eq!(signals.len(), 1); + assert_eq!(json["total"], 1, "total must be scoped to ?source="); + let by_source = json["by_source"].as_array().unwrap(); + assert_eq!(by_source.len(), 1); + assert_eq!(by_source[0]["source"], "pop-utilization"); + assert_eq!(by_source[0]["count"], 1); +}