From cafe288e15ba59ce9b523c3f690c2a4d873e44f1 Mon Sep 17 00:00:00 2001 From: Nadav Gov-Ari Date: Wed, 11 Feb 2026 14:46:39 -0500 Subject: [PATCH 1/8] WIP node affinity broadcast --- .../src/ingest_v2/broadcast.rs | 143 +++++++++++++++++- .../quickwit-ingest/src/ingest_v2/state.rs | 12 +- 2 files changed, 150 insertions(+), 5 deletions(-) diff --git a/quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs b/quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs index 9bbbe94bb47..1b6ef2dd961 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, BTreeSet, HashMap}; +use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque}; use std::time::Duration; use bytesize::ByteSize; @@ -22,10 +22,11 @@ use quickwit_common::shared_consts::INGESTER_PRIMARY_SHARDS_PREFIX; use quickwit_common::sorted_iter::{KeyDiff, SortedByKeyIterator}; use quickwit_common::tower::{ConstantRate, Rate}; use quickwit_proto::ingest::ShardState; -use quickwit_proto::types::{NodeId, ShardId, SourceUid}; +use quickwit_proto::types::{IndexUid, NodeId, ShardId, SourceId, SourceUid}; use serde::{Deserialize, Serialize, Serializer}; use tokio::task::JoinHandle; -use tracing::{debug, warn}; +use anyhow::{Context, Result}; +use tracing::{debug, error, warn}; use super::metrics::INGEST_V2_METRICS; use super::state::WeakIngesterState; @@ -422,6 +423,142 @@ pub async fn setup_local_shards_update_listener( .await } +pub type OpenShardCounts = HashMap<(IndexUid, SourceId), usize>; + +const WAL_CAPACITY_LOOKBACK_WINDOW_LEN: usize = 6; + +struct WalCapacityTimeSeries { + wal_capacity: ByteSize, + readings: VecDeque, +} + +impl WalCapacityTimeSeries { + fn new(wal_capacity: ByteSize) -> Self { + assert!( + wal_capacity.as_u64() > 0, + "WAL capacity must be greater than zero" + ); + Self { + wal_capacity, + readings: VecDeque::new(), + } + } + + fn record(&mut self, wal_used: ByteSize) { + let remaining = ByteSize::b( + self.wal_capacity.as_u64().saturating_sub(wal_used.as_u64()), + ); + self.readings.push_front(remaining); + } + + fn current(&self) -> Option { + self.readings.front().map(|b| self.as_capacity_usage_pct(*b)) + } + + fn as_capacity_usage_pct(&self, bytes: ByteSize) -> f64 { + bytes.as_u64() as f64 / self.wal_capacity.as_u64() as f64 + } + + fn delta(&mut self) -> Option { + if self.readings.is_empty() { + return None; + } + let oldest = if self.readings.len() > WAL_CAPACITY_LOOKBACK_WINDOW_LEN { + self.readings.pop_back().unwrap() + } else { + *self.readings.back().unwrap() + }; + let current = *self.readings.front().unwrap(); + Some(self.as_capacity_usage_pct(current) - self.as_capacity_usage_pct(oldest)) + } +} + +fn compute_affinity_score(remaining_capacity: f64, capacity_delta: f64) -> f32 { + if remaining_capacity <= 0.10 { + return 0.0; + } + let p = 80.0 * remaining_capacity; + let capacity_lost = (-capacity_delta).max(0.0); + let g = 20.0 * (capacity_lost / 0.10).min(1.0); + (p - g).clamp(0.0, 100.0) as f32 +} + +#[derive(Debug, Clone, Default)] +pub struct IngesterAffinity { + pub affinity_score: f32, + pub open_shard_counts: OpenShardCounts, +} + +struct BroadcastIngesterAffinityTask { + cluster: Cluster, + weak_state: WeakIngesterState, + wal_capacity_time_series: WalCapacityTimeSeries, +} + +impl BroadcastIngesterAffinityTask { + pub fn spawn( + cluster: Cluster, + weak_state: WeakIngesterState, + wal_capacity: ByteSize, + ) -> JoinHandle<()> { + let mut broadcaster = Self { + cluster, + weak_state, + wal_capacity_time_series: WalCapacityTimeSeries::new(wal_capacity), + }; + tokio::spawn(async move { broadcaster.run().await }) + } + + async fn snapshot(&self) -> Result<(ByteSize, OpenShardCounts)> { + let state = self.weak_state.upgrade() + .context("ingester state has been dropped")?; + + // Read-lock mrecordlog and drop before acquiring inner lock to avoid + // deadlock with lock_fully (which acquires mrecordlog write then inner). + let wal_lock = state.mrecordlog(); + let wal_guard = wal_lock.read().await; + let wal = wal_guard.as_ref() + .context("WAL is not initialized")?; + let wal_used = ByteSize::b(wal.resource_usage().disk_used_bytes as u64); + drop(wal_guard); + + let guard = state.lock_partially().await + .map_err(|_| anyhow::anyhow!("failed to acquire ingester state lock"))?; + let open_shard_counts = guard.get_open_shard_counts(); + + Ok((wal_used, open_shard_counts)) + } + + async fn run(&mut self) { + let mut interval = tokio::time::interval(BROADCAST_INTERVAL_PERIOD); + + loop { + interval.tick().await; + + let (wal_used, open_shard_counts) = match self.snapshot().await { + Ok(snapshot) => snapshot, + Err(error) => { + error!("failed to snapshot ingester state: {error}"); + return; + } + }; + + self.wal_capacity_time_series.record(wal_used); + + let remaining_capacity = self.wal_capacity_time_series.current().unwrap_or(1.0); + let capacity_delta = self.wal_capacity_time_series.delta().unwrap_or(0.0); + + let _affinity = IngesterAffinity { + affinity_score: compute_affinity_score(remaining_capacity, capacity_delta), + open_shard_counts, + }; + + // TODO: broadcast via Chitchat + } + } +} + + #[cfg(test)] mod tests { diff --git a/quickwit/quickwit-ingest/src/ingest_v2/state.rs b/quickwit/quickwit-ingest/src/ingest_v2/state.rs index 591ef4f704f..345bdb748ea 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/state.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/state.rs @@ -18,7 +18,7 @@ use std::ops::{Deref, DerefMut}; use std::path::Path; use std::sync::{Arc, Weak}; use std::time::{Duration, Instant}; - +use itertools::Itertools; use mrecordlog::error::{DeleteQueueError, TruncateError}; use quickwit_common::pretty::PrettyDisplay; use quickwit_common::rate_limiter::{RateLimiter, RateLimiterSettings}; @@ -26,7 +26,7 @@ use quickwit_doc_mapper::DocMapper; use quickwit_proto::control_plane::AdviseResetShardsResponse; use quickwit_proto::ingest::ingester::IngesterStatus; use quickwit_proto::ingest::{IngestV2Error, IngestV2Result, ShardState}; -use quickwit_proto::types::{DocMappingUid, IndexUid, Position, QueueId, SourceId, split_queue_id}; +use quickwit_proto::types::{DocMappingUid, IndexUid, Position, QueueId, SourceId, split_queue_id, SourceUid}; use tokio::sync::{Mutex, MutexGuard, RwLock, RwLockMappedWriteGuard, RwLockWriteGuard, watch}; use tracing::{error, info}; @@ -87,6 +87,14 @@ impl InnerIngesterState { .max_by_key(|(available_permits, _)| *available_permits) .map(|(_, shard)| shard) } + + pub fn get_open_shard_counts(&self) -> HashMap<(IndexUid, SourceId), usize> { + self.shards + .values() + .filter(|shard| shard.is_open()) + .map(|shard| (shard.index_uid.clone(), shard.source_id.clone())) + .counts() + } } impl IngesterState { From b9ec6cfe91de93d9f146f0d505cedfe8d7c8d055 Mon Sep 17 00:00:00 2001 From: Nadav Gov-Ari Date: Wed, 11 Feb 2026 16:06:26 -0500 Subject: [PATCH 2/8] Implement IngesterAffinity broadcast --- quickwit/quickwit-common/src/shared_consts.rs | 3 + quickwit/quickwit-ingest/Cargo.toml | 2 +- .../ingest_v2/broadcast/ingester_affinity.rs | 431 ++++++++++++++++++ .../local_shards.rs} | 161 +------ .../src/ingest_v2/broadcast/mod.rs | 34 ++ .../quickwit-ingest/src/ingest_v2/state.rs | 19 +- 6 files changed, 491 insertions(+), 159 deletions(-) create mode 100644 quickwit/quickwit-ingest/src/ingest_v2/broadcast/ingester_affinity.rs rename quickwit/quickwit-ingest/src/ingest_v2/{broadcast.rs => broadcast/local_shards.rs} (84%) create mode 100644 quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs diff --git a/quickwit/quickwit-common/src/shared_consts.rs b/quickwit/quickwit-common/src/shared_consts.rs index 9923705f0b2..931fc0f9ed1 100644 --- a/quickwit/quickwit-common/src/shared_consts.rs +++ b/quickwit/quickwit-common/src/shared_consts.rs @@ -64,6 +64,9 @@ pub const SCROLL_BATCH_LEN: usize = 1_000; /// Prefix used in chitchat to broadcast the list of primary shards hosted by a leader. pub const INGESTER_PRIMARY_SHARDS_PREFIX: &str = "ingester.primary_shards:"; +/// Key used in chitchat to broadcast the ingester affinity score and open shard counts. +pub const INGESTER_AFFINITY_PREFIX: &str = "ingester.affinity"; + /// File name for the encoded list of fields in the split pub const SPLIT_FIELDS_FILE_NAME: &str = "split_fields"; diff --git a/quickwit/quickwit-ingest/Cargo.toml b/quickwit/quickwit-ingest/Cargo.toml index 3dfa0bf6c0c..3149f2aaaf3 100644 --- a/quickwit/quickwit-ingest/Cargo.toml +++ b/quickwit/quickwit-ingest/Cargo.toml @@ -18,6 +18,7 @@ bytesize = { workspace = true } fail = { workspace = true, optional = true } futures = { workspace = true } http = { workspace = true } +itertools = { workspace = true } mockall = { workspace = true, optional = true } mrecordlog = { workspace = true } once_cell = { workspace = true } @@ -43,7 +44,6 @@ quickwit-doc-mapper = { workspace = true, features = ["testsuite"] } quickwit-proto = { workspace = true } [dev-dependencies] -itertools = { workspace = true } mockall = { workspace = true } rand = { workspace = true } rand_distr = { workspace = true } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/ingester_affinity.rs b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/ingester_affinity.rs new file mode 100644 index 00000000000..20bcc6105bd --- /dev/null +++ b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/ingester_affinity.rs @@ -0,0 +1,431 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::VecDeque; + +use anyhow::{Context, Result}; +use bytesize::ByteSize; +use quickwit_cluster::{Cluster, ListenerHandle}; +use quickwit_common::pubsub::{Event, EventBroker}; +use quickwit_common::shared_consts::INGESTER_AFFINITY_PREFIX; +use quickwit_proto::types::{IndexUid, NodeId, SourceId}; +use serde::{Deserialize, Serialize}; +use tokio::task::JoinHandle; +use tracing::{error, warn}; + +use crate::ingest_v2::state::WeakIngesterState; + +use super::BROADCAST_INTERVAL_PERIOD; + +pub type OpenShardCounts = Vec<(IndexUid, SourceId, usize)>; + +const WAL_CAPACITY_LOOKBACK_WINDOW_LEN: usize = 6; + +struct WalCapacityTimeSeries { + wal_capacity: ByteSize, + readings: VecDeque, +} + +impl WalCapacityTimeSeries { + fn new(wal_capacity: ByteSize) -> Self { + assert!( + wal_capacity.as_u64() > 0, + "WAL capacity must be greater than zero" + ); + Self { + wal_capacity, + readings: VecDeque::new(), + } + } + + fn record(&mut self, wal_used: ByteSize) { + let remaining = ByteSize::b( + self.wal_capacity.as_u64().saturating_sub(wal_used.as_u64()), + ); + self.readings.push_front(remaining); + } + + /// Returns the most recent remaining capacity as a fraction of total WAL capacity, + /// or `None` if no readings have been recorded yet. + fn current(&self) -> Option { + self.readings.front().map(|b| self.as_capacity_usage_pct(*b)) + } + + fn as_capacity_usage_pct(&self, bytes: ByteSize) -> f64 { + bytes.as_u64() as f64 / self.wal_capacity.as_u64() as f64 + } + + /// How much remaining capacity changed between the oldest and newest readings. + /// Positive = improving, negative = draining. + /// At most `WAL_CAPACITY_LOOKBACK_WINDOW_LEN` readings are kept. + fn delta(&mut self) -> Option { + if self.readings.is_empty() { + return None; + } + let oldest = if self.readings.len() > WAL_CAPACITY_LOOKBACK_WINDOW_LEN { + self.readings.pop_back().unwrap() + } else { + *self.readings.back().unwrap() + }; + let current = *self.readings.front().unwrap(); + Some(self.as_capacity_usage_pct(current) - self.as_capacity_usage_pct(oldest)) + } +} + +/// Computes an affinity score from 0 to 100 using a simple PI controller. +/// +/// The score has two components: +/// +/// - **P (proportional):** How much WAL capacity remains right now. +/// An ingester with 100% free capacity gets 80 points; 50% gets 40; and so on. +/// If remaining capacity drops to 5% or below, the score is immediately 0. +/// +/// - **I (integral):** A stability bonus worth up to 20 points. +/// If remaining capacity hasn't changed between the oldest and newest readings, +/// the full 20 points are awarded. As capacity drains faster, the bonus shrinks +/// linearly toward 0. A delta of -10% of total capacity (or worse) zeroes it out. +/// +/// Putting it together: a completely idle ingester scores 100 (80 + 20). +/// One that is full but stable scores ~24. One that is draining rapidly scores less. +fn compute_affinity_score(remaining_capacity: f64, capacity_delta: f64) -> f32 { + if remaining_capacity <= 0.05 { + return 0.0; + } + let p = 80.0 * remaining_capacity; + let drain = (-capacity_delta).clamp(0.0, 0.10); + let i = 20.0 * (1.0 - drain / 0.10); + (p + i).clamp(0.0, 100.0) as f32 +} + +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct IngesterAffinity { + pub affinity_score: f32, + pub open_shard_counts: OpenShardCounts, +} + +/// Periodically snapshots the ingester's WAL usage and open shard counts, computes +/// an affinity score, and broadcasts it to other nodes via Chitchat. +pub struct BroadcastIngesterAffinityTask { + cluster: Cluster, + weak_state: WeakIngesterState, + wal_capacity_time_series: WalCapacityTimeSeries, +} + +impl BroadcastIngesterAffinityTask { + pub fn spawn( + cluster: Cluster, + weak_state: WeakIngesterState, + wal_capacity: ByteSize, + ) -> JoinHandle<()> { + let mut broadcaster = Self { + cluster, + weak_state, + wal_capacity_time_series: WalCapacityTimeSeries::new(wal_capacity), + }; + tokio::spawn(async move { broadcaster.run().await }) + } + + async fn snapshot(&self) -> Result<(ByteSize, OpenShardCounts)> { + let state = self.weak_state.upgrade() + .context("ingester state has been dropped")?; + + let wal_lock = state.mrecordlog(); + let wal_guard = wal_lock.read().await; + let wal = wal_guard.as_ref() + .context("WAL is not initialized")?; + let wal_used = ByteSize::b(wal.resource_usage().disk_used_bytes as u64); + drop(wal_guard); + + let guard = state.lock_partially().await + .map_err(|_| anyhow::anyhow!("failed to acquire ingester state lock"))?; + let open_shard_counts = guard.get_open_shard_counts(); + + Ok((wal_used, open_shard_counts)) + } + + async fn run(&mut self) { + let mut interval = tokio::time::interval(BROADCAST_INTERVAL_PERIOD); + + loop { + interval.tick().await; + + let (wal_used, open_shard_counts) = match self.snapshot().await { + Ok(snapshot) => snapshot, + Err(error) => { + error!("failed to snapshot ingester state: {error}"); + return; + } + }; + + self.wal_capacity_time_series.record(wal_used); + + let remaining_capacity = self.wal_capacity_time_series.current().unwrap_or(1.0); + let capacity_delta = self.wal_capacity_time_series.delta().unwrap_or(0.0); + + let affinity = IngesterAffinity { + affinity_score: compute_affinity_score(remaining_capacity, capacity_delta), + open_shard_counts, + }; + + self.broadcast_affinity(&affinity).await; + } + } + + async fn broadcast_affinity(&self, affinity: &IngesterAffinity) { + let value = serde_json::to_string(affinity) + .expect("`IngesterAffinity` should be JSON serializable"); + self.cluster + .set_self_key_value(INGESTER_AFFINITY_PREFIX, value) + .await; + } +} + +#[derive(Debug, Clone)] +pub struct IngesterAffinityUpdate { + pub node_id: NodeId, + pub affinity_score: f32, + pub open_shard_counts: OpenShardCounts, +} + +impl Event for IngesterAffinityUpdate {} + +pub async fn setup_ingester_affinity_update_listener( + cluster: Cluster, + event_broker: EventBroker, +) -> ListenerHandle { + cluster + .subscribe(INGESTER_AFFINITY_PREFIX, move |event| { + let Ok(affinity) = serde_json::from_str::(event.value) else { + warn!("failed to parse ingester affinity `{}`", event.value); + return; + }; + let node_id: NodeId = event.node.node_id.clone().into(); + event_broker.publish(IngesterAffinityUpdate { + node_id, + affinity_score: affinity.affinity_score, + open_shard_counts: affinity.open_shard_counts, + }); + }) + .await +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + use std::sync::atomic::{AtomicUsize, Ordering}; + + use quickwit_cluster::{ChannelTransport, create_cluster_for_test}; + use quickwit_proto::types::ShardId; + + use super::*; + use crate::ingest_v2::models::IngesterShard; + use crate::ingest_v2::state::IngesterState; + + fn ts(capacity_bytes: u64) -> WalCapacityTimeSeries { + WalCapacityTimeSeries::new(ByteSize::b(capacity_bytes)) + } + + #[test] + fn test_wal_capacity_empty() { + let mut series = ts(100); + assert!(series.current().is_none()); + assert!(series.delta().is_none()); + } + + #[test] + fn test_wal_capacity_current_after_record() { + let mut series = ts(100); + // 30 bytes used => 70 remaining => 0.70 + series.record(ByteSize::b(30)); + assert_eq!(series.current(), Some(0.70)); + + // 90 bytes used => 10 remaining => 0.10 + series.record(ByteSize::b(90)); + assert_eq!(series.current(), Some(0.10)); + } + + #[test] + fn test_wal_capacity_record_saturates_at_zero() { + let mut series = ts(100); + series.record(ByteSize::b(200)); + assert_eq!(series.current(), Some(0.0)); + } + + #[test] + fn test_wal_capacity_delta_single_reading() { + let mut series = ts(100); + series.record(ByteSize::b(50)); + // current == oldest => delta is 0 + assert_eq!(series.delta(), Some(0.0)); + } + + #[test] + fn test_wal_capacity_delta_growing() { + let mut series = ts(100); + // oldest: 60 used => 40 remaining + series.record(ByteSize::b(60)); + // current: 20 used => 80 remaining + series.record(ByteSize::b(20)); + // delta = 0.80 - 0.40 = 0.40 + assert_eq!(series.delta(), Some(0.40)); + } + + #[test] + fn test_wal_capacity_delta_shrinking() { + let mut series = ts(100); + // oldest: 20 used => 80 remaining + series.record(ByteSize::b(20)); + // current: 60 used => 40 remaining + series.record(ByteSize::b(60)); + // delta = 0.40 - 0.80 = -0.40 + assert_eq!(series.delta(), Some(-0.40)); + } + + #[test] + fn test_affinity_score_draining_vs_stable() { + // Node A: capacity draining — usage increases 10, 20, ..., 70 over 7 ticks. + let mut node_a = ts(100); + for used in (10..=70).step_by(10) { + node_a.record(ByteSize::b(used)); + } + // After 7 readings + delta pop: current = 0.30, delta = -0.50 + let a_remaining = node_a.current().unwrap(); + let a_delta = node_a.delta().unwrap(); + let a_score = compute_affinity_score(a_remaining, a_delta); + + // Node B: steady at 50% usage over 7 ticks. + let mut node_b = ts(100); + for _ in 0..7 { + node_b.record(ByteSize::b(50)); + } + // After 7 readings + delta pop: current = 0.50, delta = 0.0 + let b_remaining = node_b.current().unwrap(); + let b_delta = node_b.delta().unwrap(); + let b_score = compute_affinity_score(b_remaining, b_delta); + + // p=24, i=0 (max drain) => 24.0 + assert_eq!(a_score, 24.0); + // p=40, i=20 (stable) => 60.0 + assert_eq!(b_score, 60.0); + assert!(b_score > a_score); + } + + #[tokio::test] + async fn test_snapshot_state_dropped() { + let transport = ChannelTransport::default(); + let cluster = create_cluster_for_test(Vec::new(), &["indexer"], &transport, true) + .await + .unwrap(); + let (_temp_dir, state) = IngesterState::for_test().await; + let weak_state = state.weak(); + drop(state); + + let task = BroadcastIngesterAffinityTask { + cluster, + weak_state, + wal_capacity_time_series: WalCapacityTimeSeries::new(ByteSize::mib(256)), + }; + assert!(task.snapshot().await.is_err()); + } + + #[tokio::test] + async fn test_broadcast_ingester_affinity() { + let transport = ChannelTransport::default(); + let cluster = create_cluster_for_test(Vec::new(), &["indexer"], &transport, true) + .await + .unwrap(); + let event_broker = EventBroker::default(); + + let (_temp_dir, state) = IngesterState::for_test().await; + let index_uid = IndexUid::for_test("test-index", 0); + let mut state_guard = state.lock_partially().await.unwrap(); + let shard = IngesterShard::new_solo( + index_uid.clone(), + SourceId::from("test-source"), + ShardId::from(0), + ) + .build(); + state_guard.shards.insert(shard.queue_id(), shard); + let open_shard_counts = state_guard.get_open_shard_counts(); + drop(state_guard); + + // Simulate 500 of 1000 bytes used => 50% remaining, 0 delta => score = 60.0 + let mut task = BroadcastIngesterAffinityTask { + cluster: cluster.clone(), + weak_state: state.weak(), + wal_capacity_time_series: WalCapacityTimeSeries::new(ByteSize::b(1000)), + }; + task.wal_capacity_time_series.record(ByteSize::b(500)); + + let remaining = task.wal_capacity_time_series.current().unwrap(); + let delta = task.wal_capacity_time_series.delta().unwrap(); + let affinity = IngesterAffinity { + affinity_score: compute_affinity_score(remaining, delta), + open_shard_counts, + }; + assert_eq!(affinity.affinity_score, 60.0); + + let update_counter = Arc::new(AtomicUsize::new(0)); + let update_counter_clone = update_counter.clone(); + let index_uid_clone = index_uid.clone(); + let _sub = event_broker.subscribe(move |event: IngesterAffinityUpdate| { + update_counter_clone.fetch_add(1, Ordering::Release); + assert_eq!(event.affinity_score, 60.0); + assert_eq!(event.open_shard_counts.len(), 1); + assert_eq!(event.open_shard_counts[0].0, index_uid_clone); + assert_eq!(event.open_shard_counts[0].1, "test-source"); + assert_eq!(event.open_shard_counts[0].2, 1); + }); + + let _listener = + setup_ingester_affinity_update_listener(cluster.clone(), event_broker).await; + + task.broadcast_affinity(&affinity).await; + tokio::time::sleep(BROADCAST_INTERVAL_PERIOD * 2).await; + + assert_eq!(update_counter.load(Ordering::Acquire), 1); + + let value = cluster + .get_self_key_value(INGESTER_AFFINITY_PREFIX) + .await + .unwrap(); + let deserialized: IngesterAffinity = serde_json::from_str(&value).unwrap(); + assert_eq!(deserialized.affinity_score, 60.0); + assert_eq!(deserialized.open_shard_counts.len(), 1); + } + + #[test] + fn test_wal_capacity_delta_pops_oldest_beyond_window() { + let mut series = ts(100); + + // Fill to exactly the window length (6 readings). + // All use 50 bytes => 50 remaining each. + for _ in 0..WAL_CAPACITY_LOOKBACK_WINDOW_LEN { + series.record(ByteSize::b(50)); + } + assert_eq!(series.readings.len(), WAL_CAPACITY_LOOKBACK_WINDOW_LEN); + assert_eq!(series.delta(), Some(0.0)); + // At exactly window len, delta does NOT pop. + assert_eq!(series.readings.len(), WAL_CAPACITY_LOOKBACK_WINDOW_LEN); + + // Push one more (7th): 0 used => 100 remaining => current = 1.0 + series.record(ByteSize::b(0)); + assert_eq!(series.readings.len(), WAL_CAPACITY_LOOKBACK_WINDOW_LEN + 1); + + // delta should pop the oldest (0.50) and return 1.0 - 0.50 = 0.50 + assert_eq!(series.delta(), Some(0.50)); + // After pop, back to window length. + assert_eq!(series.readings.len(), WAL_CAPACITY_LOOKBACK_WINDOW_LEN); + } +} \ No newline at end of file diff --git a/quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/local_shards.rs similarity index 84% rename from quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs rename to quickwit/quickwit-ingest/src/ingest_v2/broadcast/local_shards.rs index 1b6ef2dd961..8b410e2cca0 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/local_shards.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque}; +use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::time::Duration; use bytesize::ByteSize; @@ -22,21 +22,16 @@ use quickwit_common::shared_consts::INGESTER_PRIMARY_SHARDS_PREFIX; use quickwit_common::sorted_iter::{KeyDiff, SortedByKeyIterator}; use quickwit_common::tower::{ConstantRate, Rate}; use quickwit_proto::ingest::ShardState; -use quickwit_proto::types::{IndexUid, NodeId, ShardId, SourceId, SourceUid}; +use quickwit_proto::types::{NodeId, ShardId, SourceUid}; use serde::{Deserialize, Serialize, Serializer}; use tokio::task::JoinHandle; -use anyhow::{Context, Result}; -use tracing::{debug, error, warn}; +use tracing::{debug, warn}; -use super::metrics::INGEST_V2_METRICS; -use super::state::WeakIngesterState; +use crate::ingest_v2::metrics::INGEST_V2_METRICS; +use crate::ingest_v2::state::WeakIngesterState; use crate::RateMibPerSec; -const BROADCAST_INTERVAL_PERIOD: Duration = if cfg!(test) { - Duration::from_millis(50) -} else { - Duration::from_secs(5) -}; +use super::BROADCAST_INTERVAL_PERIOD; const ONE_MIB: ByteSize = ByteSize::mib(1); @@ -153,7 +148,7 @@ impl LocalShardsSnapshot { /// Takes a snapshot of the primary shards hosted by the ingester at regular intervals and /// broadcasts it to other nodes via Chitchat. -pub(super) struct BroadcastLocalShardsTask { +pub struct BroadcastLocalShardsTask { cluster: Cluster, weak_state: WeakIngesterState, shard_throughput_time_series_map: ShardThroughputTimeSeriesMap, @@ -423,142 +418,6 @@ pub async fn setup_local_shards_update_listener( .await } -pub type OpenShardCounts = HashMap<(IndexUid, SourceId), usize>; - -const WAL_CAPACITY_LOOKBACK_WINDOW_LEN: usize = 6; - -struct WalCapacityTimeSeries { - wal_capacity: ByteSize, - readings: VecDeque, -} - -impl WalCapacityTimeSeries { - fn new(wal_capacity: ByteSize) -> Self { - assert!( - wal_capacity.as_u64() > 0, - "WAL capacity must be greater than zero" - ); - Self { - wal_capacity, - readings: VecDeque::new(), - } - } - - fn record(&mut self, wal_used: ByteSize) { - let remaining = ByteSize::b( - self.wal_capacity.as_u64().saturating_sub(wal_used.as_u64()), - ); - self.readings.push_front(remaining); - } - - fn current(&self) -> Option { - self.readings.front().map(|b| self.as_capacity_usage_pct(*b)) - } - - fn as_capacity_usage_pct(&self, bytes: ByteSize) -> f64 { - bytes.as_u64() as f64 / self.wal_capacity.as_u64() as f64 - } - - fn delta(&mut self) -> Option { - if self.readings.is_empty() { - return None; - } - let oldest = if self.readings.len() > WAL_CAPACITY_LOOKBACK_WINDOW_LEN { - self.readings.pop_back().unwrap() - } else { - *self.readings.back().unwrap() - }; - let current = *self.readings.front().unwrap(); - Some(self.as_capacity_usage_pct(current) - self.as_capacity_usage_pct(oldest)) - } -} - -fn compute_affinity_score(remaining_capacity: f64, capacity_delta: f64) -> f32 { - if remaining_capacity <= 0.10 { - return 0.0; - } - let p = 80.0 * remaining_capacity; - let capacity_lost = (-capacity_delta).max(0.0); - let g = 20.0 * (capacity_lost / 0.10).min(1.0); - (p - g).clamp(0.0, 100.0) as f32 -} - -#[derive(Debug, Clone, Default)] -pub struct IngesterAffinity { - pub affinity_score: f32, - pub open_shard_counts: OpenShardCounts, -} - -struct BroadcastIngesterAffinityTask { - cluster: Cluster, - weak_state: WeakIngesterState, - wal_capacity_time_series: WalCapacityTimeSeries, -} - -impl BroadcastIngesterAffinityTask { - pub fn spawn( - cluster: Cluster, - weak_state: WeakIngesterState, - wal_capacity: ByteSize, - ) -> JoinHandle<()> { - let mut broadcaster = Self { - cluster, - weak_state, - wal_capacity_time_series: WalCapacityTimeSeries::new(wal_capacity), - }; - tokio::spawn(async move { broadcaster.run().await }) - } - - async fn snapshot(&self) -> Result<(ByteSize, OpenShardCounts)> { - let state = self.weak_state.upgrade() - .context("ingester state has been dropped")?; - - // Read-lock mrecordlog and drop before acquiring inner lock to avoid - // deadlock with lock_fully (which acquires mrecordlog write then inner). - let wal_lock = state.mrecordlog(); - let wal_guard = wal_lock.read().await; - let wal = wal_guard.as_ref() - .context("WAL is not initialized")?; - let wal_used = ByteSize::b(wal.resource_usage().disk_used_bytes as u64); - drop(wal_guard); - - let guard = state.lock_partially().await - .map_err(|_| anyhow::anyhow!("failed to acquire ingester state lock"))?; - let open_shard_counts = guard.get_open_shard_counts(); - - Ok((wal_used, open_shard_counts)) - } - - async fn run(&mut self) { - let mut interval = tokio::time::interval(BROADCAST_INTERVAL_PERIOD); - - loop { - interval.tick().await; - - let (wal_used, open_shard_counts) = match self.snapshot().await { - Ok(snapshot) => snapshot, - Err(error) => { - error!("failed to snapshot ingester state: {error}"); - return; - } - }; - - self.wal_capacity_time_series.record(wal_used); - - let remaining_capacity = self.wal_capacity_time_series.current().unwrap_or(1.0); - let capacity_delta = self.wal_capacity_time_series.delta().unwrap_or(0.0); - - let _affinity = IngesterAffinity { - affinity_score: compute_affinity_score(remaining_capacity, capacity_delta), - open_shard_counts, - }; - - // TODO: broadcast via Chitchat - } - } -} - - #[cfg(test)] mod tests { @@ -566,12 +425,14 @@ mod tests { use std::sync::atomic::{AtomicUsize, Ordering}; use quickwit_cluster::{ChannelTransport, create_cluster_for_test}; + use quickwit_common::shared_consts::INGESTER_PRIMARY_SHARDS_PREFIX; use quickwit_proto::ingest::ShardState; - use quickwit_proto::types::{IndexUid, SourceId}; + use quickwit_proto::types::{IndexUid, NodeId, ShardId, SourceId, SourceUid}; use super::*; use crate::ingest_v2::models::IngesterShard; use crate::ingest_v2::state::IngesterState; + use crate::RateMibPerSec; #[test] fn test_shard_info_serde() { @@ -862,4 +723,4 @@ mod tests { } assert_eq!(time_series.last(), ByteSize::mb(4)); } -} +} \ No newline at end of file diff --git a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs new file mode 100644 index 00000000000..c7135dbdf70 --- /dev/null +++ b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs @@ -0,0 +1,34 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod ingester_affinity; +mod local_shards; + +use std::time::Duration; + +pub(in crate::ingest_v2) const BROADCAST_INTERVAL_PERIOD: Duration = if cfg!(test) { + Duration::from_millis(50) +} else { + Duration::from_secs(5) +}; + +pub use local_shards::{ + BroadcastLocalShardsTask, LocalShardsUpdate, ShardInfo, ShardInfos, + setup_local_shards_update_listener, +}; + +pub use ingester_affinity::{ + BroadcastIngesterAffinityTask, IngesterAffinity, IngesterAffinityUpdate, OpenShardCounts, + setup_ingester_affinity_update_listener, +}; \ No newline at end of file diff --git a/quickwit/quickwit-ingest/src/ingest_v2/state.rs b/quickwit/quickwit-ingest/src/ingest_v2/state.rs index 345bdb748ea..1169f96589f 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/state.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/state.rs @@ -12,12 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; -use std::fmt; -use std::ops::{Deref, DerefMut}; -use std::path::Path; -use std::sync::{Arc, Weak}; -use std::time::{Duration, Instant}; use itertools::Itertools; use mrecordlog::error::{DeleteQueueError, TruncateError}; use quickwit_common::pretty::PrettyDisplay; @@ -26,7 +20,13 @@ use quickwit_doc_mapper::DocMapper; use quickwit_proto::control_plane::AdviseResetShardsResponse; use quickwit_proto::ingest::ingester::IngesterStatus; use quickwit_proto::ingest::{IngestV2Error, IngestV2Result, ShardState}; -use quickwit_proto::types::{DocMappingUid, IndexUid, Position, QueueId, SourceId, split_queue_id, SourceUid}; +use quickwit_proto::types::{DocMappingUid, IndexUid, Position, QueueId, SourceId, split_queue_id}; +use std::collections::HashMap; +use std::fmt; +use std::ops::{Deref, DerefMut}; +use std::path::Path; +use std::sync::{Arc, Weak}; +use std::time::{Duration, Instant}; use tokio::sync::{Mutex, MutexGuard, RwLock, RwLockMappedWriteGuard, RwLockWriteGuard, watch}; use tracing::{error, info}; @@ -88,12 +88,15 @@ impl InnerIngesterState { .map(|(_, shard)| shard) } - pub fn get_open_shard_counts(&self) -> HashMap<(IndexUid, SourceId), usize> { + pub fn get_open_shard_counts(&self) -> Vec<(IndexUid, SourceId, usize)> { self.shards .values() .filter(|shard| shard.is_open()) .map(|shard| (shard.index_uid.clone(), shard.source_id.clone())) .counts() + .into_iter() + .map(|((index_uid, source_id), count)| (index_uid, source_id, count)) + .collect() } } From c87dbdd7b6fafe7e97612b34941da2572ddc74de Mon Sep 17 00:00:00 2001 From: Nadav Gov-Ari Date: Wed, 11 Feb 2026 16:12:15 -0500 Subject: [PATCH 3/8] Make affinity score ints instead of floats --- .../ingest_v2/broadcast/ingester_affinity.rs | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/ingester_affinity.rs b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/ingester_affinity.rs index 20bcc6105bd..3e602df76c9 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/ingester_affinity.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/ingester_affinity.rs @@ -98,19 +98,19 @@ impl WalCapacityTimeSeries { /// /// Putting it together: a completely idle ingester scores 100 (80 + 20). /// One that is full but stable scores ~24. One that is draining rapidly scores less. -fn compute_affinity_score(remaining_capacity: f64, capacity_delta: f64) -> f32 { +fn compute_affinity_score(remaining_capacity: f64, capacity_delta: f64) -> u32 { if remaining_capacity <= 0.05 { - return 0.0; + return 0; } let p = 80.0 * remaining_capacity; let drain = (-capacity_delta).clamp(0.0, 0.10); let i = 20.0 * (1.0 - drain / 0.10); - (p + i).clamp(0.0, 100.0) as f32 + (p + i).clamp(0.0, 100.0) as u32 } #[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct IngesterAffinity { - pub affinity_score: f32, + pub affinity_score: u32, pub open_shard_counts: OpenShardCounts, } @@ -194,7 +194,7 @@ impl BroadcastIngesterAffinityTask { #[derive(Debug, Clone)] pub struct IngesterAffinityUpdate { pub node_id: NodeId, - pub affinity_score: f32, + pub affinity_score: u32, pub open_shard_counts: OpenShardCounts, } @@ -314,10 +314,10 @@ mod tests { let b_delta = node_b.delta().unwrap(); let b_score = compute_affinity_score(b_remaining, b_delta); - // p=24, i=0 (max drain) => 24.0 - assert_eq!(a_score, 24.0); - // p=40, i=20 (stable) => 60.0 - assert_eq!(b_score, 60.0); + // p=24, i=0 (max drain) => 24 + assert_eq!(a_score, 24); + // p=40, i=20 (stable) => 60 + assert_eq!(b_score, 60); assert!(b_score > a_score); } @@ -374,14 +374,14 @@ mod tests { affinity_score: compute_affinity_score(remaining, delta), open_shard_counts, }; - assert_eq!(affinity.affinity_score, 60.0); + assert_eq!(affinity.affinity_score, 60); let update_counter = Arc::new(AtomicUsize::new(0)); let update_counter_clone = update_counter.clone(); let index_uid_clone = index_uid.clone(); let _sub = event_broker.subscribe(move |event: IngesterAffinityUpdate| { update_counter_clone.fetch_add(1, Ordering::Release); - assert_eq!(event.affinity_score, 60.0); + assert_eq!(event.affinity_score, 60); assert_eq!(event.open_shard_counts.len(), 1); assert_eq!(event.open_shard_counts[0].0, index_uid_clone); assert_eq!(event.open_shard_counts[0].1, "test-source"); @@ -401,7 +401,7 @@ mod tests { .await .unwrap(); let deserialized: IngesterAffinity = serde_json::from_str(&value).unwrap(); - assert_eq!(deserialized.affinity_score, 60.0); + assert_eq!(deserialized.affinity_score, 60); assert_eq!(deserialized.open_shard_counts.len(), 1); } From 1a805371f51037c723572d48430dd77c8690c828 Mon Sep 17 00:00:00 2001 From: Nadav Gov-Ari Date: Wed, 11 Feb 2026 16:34:02 -0500 Subject: [PATCH 4/8] allow dead code for CI --- quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs index c7135dbdf70..9b10c3a18d9 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#[allow(dead_code)] mod ingester_affinity; mod local_shards; From a0302a94a462e6083c48dea747e0e7f7f0405648 Mon Sep 17 00:00:00 2001 From: Nadav Gov-Ari Date: Wed, 11 Feb 2026 17:29:17 -0500 Subject: [PATCH 5/8] make fmt and make fix --- .../ingest_v2/broadcast/ingester_affinity.rs | 40 ++++++++++--------- .../src/ingest_v2/broadcast/local_shards.rs | 9 ++--- .../src/ingest_v2/broadcast/mod.rs | 5 --- .../quickwit-ingest/src/ingest_v2/state.rs | 13 +++--- 4 files changed, 32 insertions(+), 35 deletions(-) diff --git a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/ingester_affinity.rs b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/ingester_affinity.rs index 3e602df76c9..f26e924ae42 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/ingester_affinity.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/ingester_affinity.rs @@ -24,9 +24,8 @@ use serde::{Deserialize, Serialize}; use tokio::task::JoinHandle; use tracing::{error, warn}; -use crate::ingest_v2::state::WeakIngesterState; - use super::BROADCAST_INTERVAL_PERIOD; +use crate::ingest_v2::state::WeakIngesterState; pub type OpenShardCounts = Vec<(IndexUid, SourceId, usize)>; @@ -50,16 +49,16 @@ impl WalCapacityTimeSeries { } fn record(&mut self, wal_used: ByteSize) { - let remaining = ByteSize::b( - self.wal_capacity.as_u64().saturating_sub(wal_used.as_u64()), - ); + let remaining = ByteSize::b(self.wal_capacity.as_u64().saturating_sub(wal_used.as_u64())); self.readings.push_front(remaining); } /// Returns the most recent remaining capacity as a fraction of total WAL capacity, /// or `None` if no readings have been recorded yet. fn current(&self) -> Option { - self.readings.front().map(|b| self.as_capacity_usage_pct(*b)) + self.readings + .front() + .map(|b| self.as_capacity_usage_pct(*b)) } fn as_capacity_usage_pct(&self, bytes: ByteSize) -> f64 { @@ -87,14 +86,14 @@ impl WalCapacityTimeSeries { /// /// The score has two components: /// -/// - **P (proportional):** How much WAL capacity remains right now. -/// An ingester with 100% free capacity gets 80 points; 50% gets 40; and so on. -/// If remaining capacity drops to 5% or below, the score is immediately 0. +/// - **P (proportional):** How much WAL capacity remains right now. An ingester with 100% free +/// capacity gets 80 points; 50% gets 40; and so on. If remaining capacity drops to 5% or below, +/// the score is immediately 0. /// -/// - **I (integral):** A stability bonus worth up to 20 points. -/// If remaining capacity hasn't changed between the oldest and newest readings, -/// the full 20 points are awarded. As capacity drains faster, the bonus shrinks -/// linearly toward 0. A delta of -10% of total capacity (or worse) zeroes it out. +/// - **I (integral):** A stability bonus worth up to 20 points. If remaining capacity hasn't +/// changed between the oldest and newest readings, the full 20 points are awarded. As capacity +/// drains faster, the bonus shrinks linearly toward 0. A delta of -10% of total capacity (or +/// worse) zeroes it out. /// /// Putting it together: a completely idle ingester scores 100 (80 + 20). /// One that is full but stable scores ~24. One that is draining rapidly scores less. @@ -137,17 +136,20 @@ impl BroadcastIngesterAffinityTask { } async fn snapshot(&self) -> Result<(ByteSize, OpenShardCounts)> { - let state = self.weak_state.upgrade() + let state = self + .weak_state + .upgrade() .context("ingester state has been dropped")?; let wal_lock = state.mrecordlog(); let wal_guard = wal_lock.read().await; - let wal = wal_guard.as_ref() - .context("WAL is not initialized")?; + let wal = wal_guard.as_ref().context("WAL is not initialized")?; let wal_used = ByteSize::b(wal.resource_usage().disk_used_bytes as u64); drop(wal_guard); - let guard = state.lock_partially().await + let guard = state + .lock_partially() + .await .map_err(|_| anyhow::anyhow!("failed to acquire ingester state lock"))?; let open_shard_counts = guard.get_open_shard_counts(); @@ -209,7 +211,7 @@ pub async fn setup_ingester_affinity_update_listener( let Ok(affinity) = serde_json::from_str::(event.value) else { warn!("failed to parse ingester affinity `{}`", event.value); return; - }; + }; let node_id: NodeId = event.node.node_id.clone().into(); event_broker.publish(IngesterAffinityUpdate { node_id, @@ -428,4 +430,4 @@ mod tests { // After pop, back to window length. assert_eq!(series.readings.len(), WAL_CAPACITY_LOOKBACK_WINDOW_LEN); } -} \ No newline at end of file +} diff --git a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/local_shards.rs b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/local_shards.rs index 8b410e2cca0..063cc73d372 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/local_shards.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/local_shards.rs @@ -27,11 +27,10 @@ use serde::{Deserialize, Serialize, Serializer}; use tokio::task::JoinHandle; use tracing::{debug, warn}; +use super::BROADCAST_INTERVAL_PERIOD; +use crate::RateMibPerSec; use crate::ingest_v2::metrics::INGEST_V2_METRICS; use crate::ingest_v2::state::WeakIngesterState; -use crate::RateMibPerSec; - -use super::BROADCAST_INTERVAL_PERIOD; const ONE_MIB: ByteSize = ByteSize::mib(1); @@ -430,9 +429,9 @@ mod tests { use quickwit_proto::types::{IndexUid, NodeId, ShardId, SourceId, SourceUid}; use super::*; + use crate::RateMibPerSec; use crate::ingest_v2::models::IngesterShard; use crate::ingest_v2::state::IngesterState; - use crate::RateMibPerSec; #[test] fn test_shard_info_serde() { @@ -723,4 +722,4 @@ mod tests { } assert_eq!(time_series.last(), ByteSize::mb(4)); } -} \ No newline at end of file +} diff --git a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs index 9b10c3a18d9..86d75d1c283 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs @@ -28,8 +28,3 @@ pub use local_shards::{ BroadcastLocalShardsTask, LocalShardsUpdate, ShardInfo, ShardInfos, setup_local_shards_update_listener, }; - -pub use ingester_affinity::{ - BroadcastIngesterAffinityTask, IngesterAffinity, IngesterAffinityUpdate, OpenShardCounts, - setup_ingester_affinity_update_listener, -}; \ No newline at end of file diff --git a/quickwit/quickwit-ingest/src/ingest_v2/state.rs b/quickwit/quickwit-ingest/src/ingest_v2/state.rs index 1169f96589f..43264c69b15 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/state.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/state.rs @@ -12,6 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; +use std::fmt; +use std::ops::{Deref, DerefMut}; +use std::path::Path; +use std::sync::{Arc, Weak}; +use std::time::{Duration, Instant}; + use itertools::Itertools; use mrecordlog::error::{DeleteQueueError, TruncateError}; use quickwit_common::pretty::PrettyDisplay; @@ -21,12 +28,6 @@ use quickwit_proto::control_plane::AdviseResetShardsResponse; use quickwit_proto::ingest::ingester::IngesterStatus; use quickwit_proto::ingest::{IngestV2Error, IngestV2Result, ShardState}; use quickwit_proto::types::{DocMappingUid, IndexUid, Position, QueueId, SourceId, split_queue_id}; -use std::collections::HashMap; -use std::fmt; -use std::ops::{Deref, DerefMut}; -use std::path::Path; -use std::sync::{Arc, Weak}; -use std::time::{Duration, Instant}; use tokio::sync::{Mutex, MutexGuard, RwLock, RwLockMappedWriteGuard, RwLockWriteGuard, watch}; use tracing::{error, info}; From 94856a9cf5bcc58a0d8dec38a0a3ac2d87dcc325 Mon Sep 17 00:00:00 2001 From: Nadav Gov-Ari Date: Thu, 12 Feb 2026 16:04:36 -0500 Subject: [PATCH 6/8] Address PR comments --- quickwit/quickwit-common/src/lib.rs | 1 + quickwit/quickwit-common/src/ring_buffer.rs | 160 ++++++ quickwit/quickwit-common/src/shared_consts.rs | 4 +- .../ingest_v2/broadcast/ingester_affinity.rs | 433 ----------------- .../ingest_v2/broadcast/ingester_capacity.rs | 456 ++++++++++++++++++ .../src/ingest_v2/broadcast/local_shards.rs | 73 +-- .../src/ingest_v2/broadcast/mod.rs | 48 +- .../quickwit-ingest/src/ingest_v2/state.rs | 77 ++- 8 files changed, 752 insertions(+), 500 deletions(-) create mode 100644 quickwit/quickwit-common/src/ring_buffer.rs delete mode 100644 quickwit/quickwit-ingest/src/ingest_v2/broadcast/ingester_affinity.rs create mode 100644 quickwit/quickwit-ingest/src/ingest_v2/broadcast/ingester_capacity.rs diff --git a/quickwit/quickwit-common/src/lib.rs b/quickwit/quickwit-common/src/lib.rs index 0f3af2bc5ba..11147f975f9 100644 --- a/quickwit/quickwit-common/src/lib.rs +++ b/quickwit/quickwit-common/src/lib.rs @@ -36,6 +36,7 @@ pub mod rate_limited_tracing; pub mod rate_limiter; pub mod rendezvous_hasher; pub mod retry; +pub mod ring_buffer; pub mod runtimes; pub mod shared_consts; pub mod sorted_iter; diff --git a/quickwit/quickwit-common/src/ring_buffer.rs b/quickwit/quickwit-common/src/ring_buffer.rs new file mode 100644 index 00000000000..f92dc504bb3 --- /dev/null +++ b/quickwit/quickwit-common/src/ring_buffer.rs @@ -0,0 +1,160 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::{Debug, Formatter}; + +/// A fixed-capacity circular buffer that overwrites the oldest element when full. +/// +/// Elements are stored in a flat array of size `N` and rotated on each push. +/// The newest element is always at position `N - 1` (the last slot), and the +/// oldest is at position `N - len`. +pub struct RingBuffer { + buffer: [T; N], + len: usize, +} + +impl Default for RingBuffer { + fn default() -> Self { + Self { + buffer: [T::default(); N], + len: 0, + } + } +} + +impl Debug for RingBuffer { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_list().entries(self.iter()).finish() + } +} + +impl RingBuffer { + pub fn push(&mut self, value: T) { + self.len = (self.len + 1).min(N); + self.buffer.rotate_left(1); + if let Some(last) = self.buffer.last_mut() { + *last = value; + } + } + + pub fn last(&self) -> Option { + if self.len == 0 { + return None; + } + self.buffer.last().copied() + } + + pub fn oldest(&self) -> Option { + if self.len == 0 { + return None; + } + Some(self.buffer[N - self.len]) + } + + pub fn len(&self) -> usize { + self.len + } + + pub fn is_empty(&self) -> bool { + self.len == 0 + } + + /// Iterates from oldest to newest over the recorded elements. + pub fn iter(&self) -> impl Iterator { + self.buffer[N - self.len..].iter() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_empty() { + let rb = RingBuffer::::default(); + assert!(rb.is_empty()); + assert_eq!(rb.len(), 0); + assert_eq!(rb.last(), None); + assert_eq!(rb.oldest(), None); + assert_eq!(rb.iter().count(), 0); + } + + #[test] + fn test_single_push() { + let mut rb = RingBuffer::::default(); + rb.push(10); + assert_eq!(rb.len(), 1); + assert!(!rb.is_empty()); + assert_eq!(rb.last(), Some(10)); + assert_eq!(rb.oldest(), Some(10)); + assert_eq!(rb.iter().copied().collect::>(), vec![10]); + } + + #[test] + fn test_partial_fill() { + let mut rb = RingBuffer::::default(); + rb.push(1); + rb.push(2); + rb.push(3); + assert_eq!(rb.len(), 3); + assert_eq!(rb.last(), Some(3)); + assert_eq!(rb.oldest(), Some(1)); + assert_eq!(rb.iter().copied().collect::>(), vec![1, 2, 3]); + } + + #[test] + fn test_exactly_full() { + let mut rb = RingBuffer::::default(); + for i in 1..=4 { + rb.push(i); + } + assert_eq!(rb.len(), 4); + assert_eq!(rb.last(), Some(4)); + assert_eq!(rb.oldest(), Some(1)); + assert_eq!(rb.iter().copied().collect::>(), vec![1, 2, 3, 4]); + } + + #[test] + fn test_wrap_around() { + let mut rb = RingBuffer::::default(); + for i in 1..=6 { + rb.push(i); + } + // Buffer should contain [3, 4, 5, 6], oldest overwritten. + assert_eq!(rb.len(), 4); + assert_eq!(rb.last(), Some(6)); + assert_eq!(rb.oldest(), Some(3)); + assert_eq!(rb.iter().copied().collect::>(), vec![3, 4, 5, 6]); + } + + #[test] + fn test_many_wraps() { + let mut rb = RingBuffer::::default(); + for i in 1..=100 { + rb.push(i); + } + assert_eq!(rb.len(), 3); + assert_eq!(rb.last(), Some(100)); + assert_eq!(rb.oldest(), Some(98)); + assert_eq!(rb.iter().copied().collect::>(), vec![98, 99, 100]); + } + + #[test] + fn test_debug() { + let mut rb = RingBuffer::::default(); + rb.push(1); + rb.push(2); + assert_eq!(format!("{:?}", rb), "[1, 2]"); + } +} diff --git a/quickwit/quickwit-common/src/shared_consts.rs b/quickwit/quickwit-common/src/shared_consts.rs index 931fc0f9ed1..56db9e13691 100644 --- a/quickwit/quickwit-common/src/shared_consts.rs +++ b/quickwit/quickwit-common/src/shared_consts.rs @@ -64,8 +64,8 @@ pub const SCROLL_BATCH_LEN: usize = 1_000; /// Prefix used in chitchat to broadcast the list of primary shards hosted by a leader. pub const INGESTER_PRIMARY_SHARDS_PREFIX: &str = "ingester.primary_shards:"; -/// Key used in chitchat to broadcast the ingester affinity score and open shard counts. -pub const INGESTER_AFFINITY_PREFIX: &str = "ingester.affinity"; +/// Prefix used in chitchat to broadcast per-source ingester capacity scores and open shard counts. +pub const INGESTER_CAPACITY_PREFIX: &str = "ingester.capacity:"; /// File name for the encoded list of fields in the split pub const SPLIT_FIELDS_FILE_NAME: &str = "split_fields"; diff --git a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/ingester_affinity.rs b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/ingester_affinity.rs deleted file mode 100644 index f26e924ae42..00000000000 --- a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/ingester_affinity.rs +++ /dev/null @@ -1,433 +0,0 @@ -// Copyright 2021-Present Datadog, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::VecDeque; - -use anyhow::{Context, Result}; -use bytesize::ByteSize; -use quickwit_cluster::{Cluster, ListenerHandle}; -use quickwit_common::pubsub::{Event, EventBroker}; -use quickwit_common::shared_consts::INGESTER_AFFINITY_PREFIX; -use quickwit_proto::types::{IndexUid, NodeId, SourceId}; -use serde::{Deserialize, Serialize}; -use tokio::task::JoinHandle; -use tracing::{error, warn}; - -use super::BROADCAST_INTERVAL_PERIOD; -use crate::ingest_v2::state::WeakIngesterState; - -pub type OpenShardCounts = Vec<(IndexUid, SourceId, usize)>; - -const WAL_CAPACITY_LOOKBACK_WINDOW_LEN: usize = 6; - -struct WalCapacityTimeSeries { - wal_capacity: ByteSize, - readings: VecDeque, -} - -impl WalCapacityTimeSeries { - fn new(wal_capacity: ByteSize) -> Self { - assert!( - wal_capacity.as_u64() > 0, - "WAL capacity must be greater than zero" - ); - Self { - wal_capacity, - readings: VecDeque::new(), - } - } - - fn record(&mut self, wal_used: ByteSize) { - let remaining = ByteSize::b(self.wal_capacity.as_u64().saturating_sub(wal_used.as_u64())); - self.readings.push_front(remaining); - } - - /// Returns the most recent remaining capacity as a fraction of total WAL capacity, - /// or `None` if no readings have been recorded yet. - fn current(&self) -> Option { - self.readings - .front() - .map(|b| self.as_capacity_usage_pct(*b)) - } - - fn as_capacity_usage_pct(&self, bytes: ByteSize) -> f64 { - bytes.as_u64() as f64 / self.wal_capacity.as_u64() as f64 - } - - /// How much remaining capacity changed between the oldest and newest readings. - /// Positive = improving, negative = draining. - /// At most `WAL_CAPACITY_LOOKBACK_WINDOW_LEN` readings are kept. - fn delta(&mut self) -> Option { - if self.readings.is_empty() { - return None; - } - let oldest = if self.readings.len() > WAL_CAPACITY_LOOKBACK_WINDOW_LEN { - self.readings.pop_back().unwrap() - } else { - *self.readings.back().unwrap() - }; - let current = *self.readings.front().unwrap(); - Some(self.as_capacity_usage_pct(current) - self.as_capacity_usage_pct(oldest)) - } -} - -/// Computes an affinity score from 0 to 100 using a simple PI controller. -/// -/// The score has two components: -/// -/// - **P (proportional):** How much WAL capacity remains right now. An ingester with 100% free -/// capacity gets 80 points; 50% gets 40; and so on. If remaining capacity drops to 5% or below, -/// the score is immediately 0. -/// -/// - **I (integral):** A stability bonus worth up to 20 points. If remaining capacity hasn't -/// changed between the oldest and newest readings, the full 20 points are awarded. As capacity -/// drains faster, the bonus shrinks linearly toward 0. A delta of -10% of total capacity (or -/// worse) zeroes it out. -/// -/// Putting it together: a completely idle ingester scores 100 (80 + 20). -/// One that is full but stable scores ~24. One that is draining rapidly scores less. -fn compute_affinity_score(remaining_capacity: f64, capacity_delta: f64) -> u32 { - if remaining_capacity <= 0.05 { - return 0; - } - let p = 80.0 * remaining_capacity; - let drain = (-capacity_delta).clamp(0.0, 0.10); - let i = 20.0 * (1.0 - drain / 0.10); - (p + i).clamp(0.0, 100.0) as u32 -} - -#[derive(Debug, Clone, Default, Serialize, Deserialize)] -pub struct IngesterAffinity { - pub affinity_score: u32, - pub open_shard_counts: OpenShardCounts, -} - -/// Periodically snapshots the ingester's WAL usage and open shard counts, computes -/// an affinity score, and broadcasts it to other nodes via Chitchat. -pub struct BroadcastIngesterAffinityTask { - cluster: Cluster, - weak_state: WeakIngesterState, - wal_capacity_time_series: WalCapacityTimeSeries, -} - -impl BroadcastIngesterAffinityTask { - pub fn spawn( - cluster: Cluster, - weak_state: WeakIngesterState, - wal_capacity: ByteSize, - ) -> JoinHandle<()> { - let mut broadcaster = Self { - cluster, - weak_state, - wal_capacity_time_series: WalCapacityTimeSeries::new(wal_capacity), - }; - tokio::spawn(async move { broadcaster.run().await }) - } - - async fn snapshot(&self) -> Result<(ByteSize, OpenShardCounts)> { - let state = self - .weak_state - .upgrade() - .context("ingester state has been dropped")?; - - let wal_lock = state.mrecordlog(); - let wal_guard = wal_lock.read().await; - let wal = wal_guard.as_ref().context("WAL is not initialized")?; - let wal_used = ByteSize::b(wal.resource_usage().disk_used_bytes as u64); - drop(wal_guard); - - let guard = state - .lock_partially() - .await - .map_err(|_| anyhow::anyhow!("failed to acquire ingester state lock"))?; - let open_shard_counts = guard.get_open_shard_counts(); - - Ok((wal_used, open_shard_counts)) - } - - async fn run(&mut self) { - let mut interval = tokio::time::interval(BROADCAST_INTERVAL_PERIOD); - - loop { - interval.tick().await; - - let (wal_used, open_shard_counts) = match self.snapshot().await { - Ok(snapshot) => snapshot, - Err(error) => { - error!("failed to snapshot ingester state: {error}"); - return; - } - }; - - self.wal_capacity_time_series.record(wal_used); - - let remaining_capacity = self.wal_capacity_time_series.current().unwrap_or(1.0); - let capacity_delta = self.wal_capacity_time_series.delta().unwrap_or(0.0); - - let affinity = IngesterAffinity { - affinity_score: compute_affinity_score(remaining_capacity, capacity_delta), - open_shard_counts, - }; - - self.broadcast_affinity(&affinity).await; - } - } - - async fn broadcast_affinity(&self, affinity: &IngesterAffinity) { - let value = serde_json::to_string(affinity) - .expect("`IngesterAffinity` should be JSON serializable"); - self.cluster - .set_self_key_value(INGESTER_AFFINITY_PREFIX, value) - .await; - } -} - -#[derive(Debug, Clone)] -pub struct IngesterAffinityUpdate { - pub node_id: NodeId, - pub affinity_score: u32, - pub open_shard_counts: OpenShardCounts, -} - -impl Event for IngesterAffinityUpdate {} - -pub async fn setup_ingester_affinity_update_listener( - cluster: Cluster, - event_broker: EventBroker, -) -> ListenerHandle { - cluster - .subscribe(INGESTER_AFFINITY_PREFIX, move |event| { - let Ok(affinity) = serde_json::from_str::(event.value) else { - warn!("failed to parse ingester affinity `{}`", event.value); - return; - }; - let node_id: NodeId = event.node.node_id.clone().into(); - event_broker.publish(IngesterAffinityUpdate { - node_id, - affinity_score: affinity.affinity_score, - open_shard_counts: affinity.open_shard_counts, - }); - }) - .await -} - -#[cfg(test)] -mod tests { - use std::sync::Arc; - use std::sync::atomic::{AtomicUsize, Ordering}; - - use quickwit_cluster::{ChannelTransport, create_cluster_for_test}; - use quickwit_proto::types::ShardId; - - use super::*; - use crate::ingest_v2::models::IngesterShard; - use crate::ingest_v2::state::IngesterState; - - fn ts(capacity_bytes: u64) -> WalCapacityTimeSeries { - WalCapacityTimeSeries::new(ByteSize::b(capacity_bytes)) - } - - #[test] - fn test_wal_capacity_empty() { - let mut series = ts(100); - assert!(series.current().is_none()); - assert!(series.delta().is_none()); - } - - #[test] - fn test_wal_capacity_current_after_record() { - let mut series = ts(100); - // 30 bytes used => 70 remaining => 0.70 - series.record(ByteSize::b(30)); - assert_eq!(series.current(), Some(0.70)); - - // 90 bytes used => 10 remaining => 0.10 - series.record(ByteSize::b(90)); - assert_eq!(series.current(), Some(0.10)); - } - - #[test] - fn test_wal_capacity_record_saturates_at_zero() { - let mut series = ts(100); - series.record(ByteSize::b(200)); - assert_eq!(series.current(), Some(0.0)); - } - - #[test] - fn test_wal_capacity_delta_single_reading() { - let mut series = ts(100); - series.record(ByteSize::b(50)); - // current == oldest => delta is 0 - assert_eq!(series.delta(), Some(0.0)); - } - - #[test] - fn test_wal_capacity_delta_growing() { - let mut series = ts(100); - // oldest: 60 used => 40 remaining - series.record(ByteSize::b(60)); - // current: 20 used => 80 remaining - series.record(ByteSize::b(20)); - // delta = 0.80 - 0.40 = 0.40 - assert_eq!(series.delta(), Some(0.40)); - } - - #[test] - fn test_wal_capacity_delta_shrinking() { - let mut series = ts(100); - // oldest: 20 used => 80 remaining - series.record(ByteSize::b(20)); - // current: 60 used => 40 remaining - series.record(ByteSize::b(60)); - // delta = 0.40 - 0.80 = -0.40 - assert_eq!(series.delta(), Some(-0.40)); - } - - #[test] - fn test_affinity_score_draining_vs_stable() { - // Node A: capacity draining — usage increases 10, 20, ..., 70 over 7 ticks. - let mut node_a = ts(100); - for used in (10..=70).step_by(10) { - node_a.record(ByteSize::b(used)); - } - // After 7 readings + delta pop: current = 0.30, delta = -0.50 - let a_remaining = node_a.current().unwrap(); - let a_delta = node_a.delta().unwrap(); - let a_score = compute_affinity_score(a_remaining, a_delta); - - // Node B: steady at 50% usage over 7 ticks. - let mut node_b = ts(100); - for _ in 0..7 { - node_b.record(ByteSize::b(50)); - } - // After 7 readings + delta pop: current = 0.50, delta = 0.0 - let b_remaining = node_b.current().unwrap(); - let b_delta = node_b.delta().unwrap(); - let b_score = compute_affinity_score(b_remaining, b_delta); - - // p=24, i=0 (max drain) => 24 - assert_eq!(a_score, 24); - // p=40, i=20 (stable) => 60 - assert_eq!(b_score, 60); - assert!(b_score > a_score); - } - - #[tokio::test] - async fn test_snapshot_state_dropped() { - let transport = ChannelTransport::default(); - let cluster = create_cluster_for_test(Vec::new(), &["indexer"], &transport, true) - .await - .unwrap(); - let (_temp_dir, state) = IngesterState::for_test().await; - let weak_state = state.weak(); - drop(state); - - let task = BroadcastIngesterAffinityTask { - cluster, - weak_state, - wal_capacity_time_series: WalCapacityTimeSeries::new(ByteSize::mib(256)), - }; - assert!(task.snapshot().await.is_err()); - } - - #[tokio::test] - async fn test_broadcast_ingester_affinity() { - let transport = ChannelTransport::default(); - let cluster = create_cluster_for_test(Vec::new(), &["indexer"], &transport, true) - .await - .unwrap(); - let event_broker = EventBroker::default(); - - let (_temp_dir, state) = IngesterState::for_test().await; - let index_uid = IndexUid::for_test("test-index", 0); - let mut state_guard = state.lock_partially().await.unwrap(); - let shard = IngesterShard::new_solo( - index_uid.clone(), - SourceId::from("test-source"), - ShardId::from(0), - ) - .build(); - state_guard.shards.insert(shard.queue_id(), shard); - let open_shard_counts = state_guard.get_open_shard_counts(); - drop(state_guard); - - // Simulate 500 of 1000 bytes used => 50% remaining, 0 delta => score = 60.0 - let mut task = BroadcastIngesterAffinityTask { - cluster: cluster.clone(), - weak_state: state.weak(), - wal_capacity_time_series: WalCapacityTimeSeries::new(ByteSize::b(1000)), - }; - task.wal_capacity_time_series.record(ByteSize::b(500)); - - let remaining = task.wal_capacity_time_series.current().unwrap(); - let delta = task.wal_capacity_time_series.delta().unwrap(); - let affinity = IngesterAffinity { - affinity_score: compute_affinity_score(remaining, delta), - open_shard_counts, - }; - assert_eq!(affinity.affinity_score, 60); - - let update_counter = Arc::new(AtomicUsize::new(0)); - let update_counter_clone = update_counter.clone(); - let index_uid_clone = index_uid.clone(); - let _sub = event_broker.subscribe(move |event: IngesterAffinityUpdate| { - update_counter_clone.fetch_add(1, Ordering::Release); - assert_eq!(event.affinity_score, 60); - assert_eq!(event.open_shard_counts.len(), 1); - assert_eq!(event.open_shard_counts[0].0, index_uid_clone); - assert_eq!(event.open_shard_counts[0].1, "test-source"); - assert_eq!(event.open_shard_counts[0].2, 1); - }); - - let _listener = - setup_ingester_affinity_update_listener(cluster.clone(), event_broker).await; - - task.broadcast_affinity(&affinity).await; - tokio::time::sleep(BROADCAST_INTERVAL_PERIOD * 2).await; - - assert_eq!(update_counter.load(Ordering::Acquire), 1); - - let value = cluster - .get_self_key_value(INGESTER_AFFINITY_PREFIX) - .await - .unwrap(); - let deserialized: IngesterAffinity = serde_json::from_str(&value).unwrap(); - assert_eq!(deserialized.affinity_score, 60); - assert_eq!(deserialized.open_shard_counts.len(), 1); - } - - #[test] - fn test_wal_capacity_delta_pops_oldest_beyond_window() { - let mut series = ts(100); - - // Fill to exactly the window length (6 readings). - // All use 50 bytes => 50 remaining each. - for _ in 0..WAL_CAPACITY_LOOKBACK_WINDOW_LEN { - series.record(ByteSize::b(50)); - } - assert_eq!(series.readings.len(), WAL_CAPACITY_LOOKBACK_WINDOW_LEN); - assert_eq!(series.delta(), Some(0.0)); - // At exactly window len, delta does NOT pop. - assert_eq!(series.readings.len(), WAL_CAPACITY_LOOKBACK_WINDOW_LEN); - - // Push one more (7th): 0 used => 100 remaining => current = 1.0 - series.record(ByteSize::b(0)); - assert_eq!(series.readings.len(), WAL_CAPACITY_LOOKBACK_WINDOW_LEN + 1); - - // delta should pop the oldest (0.50) and return 1.0 - 0.50 = 0.50 - assert_eq!(series.delta(), Some(0.50)); - // After pop, back to window length. - assert_eq!(series.readings.len(), WAL_CAPACITY_LOOKBACK_WINDOW_LEN); - } -} diff --git a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/ingester_capacity.rs b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/ingester_capacity.rs new file mode 100644 index 00000000000..0ea45d9dad3 --- /dev/null +++ b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/ingester_capacity.rs @@ -0,0 +1,456 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::BTreeSet; + +use anyhow::{Context, Result}; +use bytesize::ByteSize; +use quickwit_cluster::{Cluster, ListenerHandle}; +use quickwit_common::pubsub::{Event, EventBroker}; +use quickwit_common::ring_buffer::RingBuffer; +use quickwit_common::shared_consts::INGESTER_CAPACITY_PREFIX; +use quickwit_proto::ingest::ingester::IngesterStatus; +use quickwit_proto::types::{IndexUid, NodeId, SourceId, SourceUid}; +use serde::{Deserialize, Serialize}; +use tokio::task::JoinHandle; +use tracing::{info, warn}; + +use super::{BROADCAST_INTERVAL_PERIOD, make_key, parse_key}; +use crate::ingest_v2::state::WeakIngesterState; + +pub type OpenShardCounts = Vec<(IndexUid, SourceId, usize)>; + +/// The lookback window length is meant to capture readings far enough back in time to give +/// a rough rate of change estimate. At size 6, with broadcast interval of 5 seconds, this would be +/// 30 seconds of readings. +const WAL_CAPACITY_LOOKBACK_WINDOW_LEN: usize = 6; + +/// The ring buffer stores one extra element so that `delta()` can compare the newest reading +/// with the one that is exactly `WAL_CAPACITY_LOOKBACK_WINDOW_LEN` steps ago. Otherwise, that +/// reading would be discarded when the next reading is inserted. +const WAL_CAPACITY_READINGS_LEN: usize = WAL_CAPACITY_LOOKBACK_WINDOW_LEN + 1; + +struct WalDiskCapacityTimeSeries { + wal_capacity: ByteSize, + readings: RingBuffer, +} + +impl WalDiskCapacityTimeSeries { + fn new(wal_capacity: ByteSize) -> Self { + assert!( + wal_capacity.as_u64() > 0, + "WAL capacity must be greater than zero" + ); + Self { + wal_capacity, + readings: RingBuffer::default(), + } + } + + fn record(&mut self, wal_used: ByteSize) { + let remaining = ByteSize::b(self.wal_capacity.as_u64().saturating_sub(wal_used.as_u64())); + self.readings.push(remaining); + } + + /// Returns the most recent remaining capacity as a fraction of total WAL capacity, + /// or `None` if no readings have been recorded yet. + fn current(&self) -> Option { + self.readings.last().map(|b| self.as_capacity_pct(b)) + } + + fn as_capacity_pct(&self, bytes: ByteSize) -> f64 { + bytes.as_u64() as f64 / self.wal_capacity.as_u64() as f64 + } + + /// How much remaining capacity changed between the oldest and newest readings. + /// Positive = improving, negative = draining. + fn delta(&self) -> Option { + let current = self.readings.last()?; + let oldest = self.readings.oldest()?; + Some(self.as_capacity_pct(current) - self.as_capacity_pct(oldest)) + } +} + +/// Computes a capacity score from 0 to 10 using a PD controller. +/// +/// The score has two components: +/// +/// - **P (proportional):** How much WAL capacity remains right now. An ingester with 100% free +/// capacity gets `PROPORTIONAL_WEIGHT` points; 50% gets half; and so on. If remaining capacity +/// drops to `MIN_PERMISSIBLE_CAPACITY` or below, the score is immediately 0. +/// +/// - **D (derivative):** Up to `DERIVATIVE_WEIGHT` bonus points based on how fast remaining +/// capacity is changing over the lookback window. A higher drain rate is worse, so we invert it: +/// `drain / MAX_DRAIN_RATE` normalizes the drain to a 0–1 penalty, and subtracting from 1 +/// converts it into a 0–1 bonus. Multiplied by `DERIVATIVE_WEIGHT`, a stable node gets the full +/// bonus and a node draining at `MAX_DRAIN_RATE` or faster gets nothing. +/// +/// Putting it together: a completely idle ingester scores 10 (8 + 2). +/// One that is full but stable scores ~2. One that is draining rapidly scores less. +/// A score of 0 means the ingester is at or below minimum permissible capacity. +/// +/// Below this remaining capacity fraction, the score is immediately 0. +const MIN_PERMISSIBLE_CAPACITY: f64 = 0.05; +/// Weight of the proportional term (max points from P). +const PROPORTIONAL_WEIGHT: f64 = 8.0; +/// Weight of the derivative term (max points from D). +const DERIVATIVE_WEIGHT: f64 = 2.0; +/// The drain rate (as a fraction of total capacity over the lookback window) at which the +/// derivative penalty is fully applied. Drain rates beyond this are clamped. +const MAX_DRAIN_RATE: f64 = 0.10; + +fn compute_capacity_score(remaining_capacity: f64, capacity_delta: f64) -> usize { + if remaining_capacity <= MIN_PERMISSIBLE_CAPACITY { + return 0; + } + let p = PROPORTIONAL_WEIGHT * remaining_capacity; + let drain = (-capacity_delta).clamp(0.0, MAX_DRAIN_RATE); + let d = DERIVATIVE_WEIGHT * (1.0 - drain / MAX_DRAIN_RATE); + (p + d).clamp(0.0, 10.0) as usize +} + +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct IngesterCapacity { + pub capacity_score: usize, + pub open_shard_count: usize, +} + +/// Periodically snapshots the ingester's WAL usage and open shard counts, computes +/// a capacity score, and broadcasts it to other nodes via Chitchat. +pub struct BroadcastIngesterCapacityTask { + cluster: Cluster, + weak_state: WeakIngesterState, + wal_capacity_time_series: WalDiskCapacityTimeSeries, +} + +impl BroadcastIngesterCapacityTask { + pub fn spawn( + cluster: Cluster, + weak_state: WeakIngesterState, + wal_capacity: ByteSize, + ) -> JoinHandle<()> { + let mut broadcaster = Self { + cluster, + weak_state, + wal_capacity_time_series: WalDiskCapacityTimeSeries::new(wal_capacity), + }; + tokio::spawn(async move { broadcaster.run().await }) + } + + async fn snapshot(&self) -> Result> { + let state = self + .weak_state + .upgrade() + .context("ingester state has been dropped")?; + + // lock fully asserts that the ingester is ready. There's a likelihood that this task runs + // before the WAL is loaded, so we make sure that the ingester is ready just in case. + if *state.status_rx.borrow() != IngesterStatus::Ready { + return Ok(None); + } + + let guard = state + .lock_fully() + .await + .map_err(|_| anyhow::anyhow!("failed to acquire ingester state lock"))?; + let wal_used = ByteSize::b(guard.mrecordlog.resource_usage().disk_used_bytes as u64); + let open_shard_counts = guard.get_open_shard_counts(); + + Ok(Some((wal_used, open_shard_counts))) + } + + async fn run(&mut self) { + let mut interval = tokio::time::interval(BROADCAST_INTERVAL_PERIOD); + let mut previous_sources: BTreeSet = BTreeSet::new(); + + loop { + interval.tick().await; + + let (wal_used, open_shard_counts) = match self.snapshot().await { + Ok(Some(snapshot)) => snapshot, + Ok(None) => continue, + Err(error) => { + info!("stopping ingester capacity broadcast: {error}"); + return; + } + }; + + self.wal_capacity_time_series.record(wal_used); + + let remaining_capacity = self.wal_capacity_time_series.current().unwrap_or(1.0); + let capacity_delta = self.wal_capacity_time_series.delta().unwrap_or(0.0); + let capacity_score = compute_capacity_score(remaining_capacity, capacity_delta); + + previous_sources = self + .broadcast_capacity(capacity_score, &open_shard_counts, &previous_sources) + .await; + } + } + + async fn broadcast_capacity( + &self, + capacity_score: usize, + open_shard_counts: &OpenShardCounts, + previous_sources: &BTreeSet, + ) -> BTreeSet { + let mut current_sources = BTreeSet::new(); + + for (index_uid, source_id, open_shard_count) in open_shard_counts { + let source_uid = SourceUid { + index_uid: index_uid.clone(), + source_id: source_id.clone(), + }; + let key = make_key(INGESTER_CAPACITY_PREFIX, &source_uid); + let capacity = IngesterCapacity { + capacity_score, + open_shard_count: *open_shard_count, + }; + let value = serde_json::to_string(&capacity) + .expect("`IngesterCapacity` should be JSON serializable"); + self.cluster.set_self_key_value(key, value).await; + current_sources.insert(source_uid); + } + + for removed_source in previous_sources.difference(¤t_sources) { + let key = make_key(INGESTER_CAPACITY_PREFIX, removed_source); + self.cluster.remove_self_key(&key).await; + } + + current_sources + } +} + +#[derive(Debug, Clone)] +pub struct IngesterCapacityUpdate { + pub node_id: NodeId, + pub source_uid: SourceUid, + pub capacity_score: usize, + pub open_shard_count: usize, +} + +impl Event for IngesterCapacityUpdate {} + +pub async fn setup_ingester_capacity_update_listener( + cluster: Cluster, + event_broker: EventBroker, +) -> ListenerHandle { + cluster + .subscribe(INGESTER_CAPACITY_PREFIX, move |event| { + let Some(source_uid) = parse_key(event.key) else { + warn!("failed to parse source UID from key `{}`", event.key); + return; + }; + let Ok(ingester_capacity) = serde_json::from_str::(event.value) + else { + warn!("failed to parse ingester capacity `{}`", event.value); + return; + }; + let node_id: NodeId = event.node.node_id.clone().into(); + event_broker.publish(IngesterCapacityUpdate { + node_id, + source_uid, + capacity_score: ingester_capacity.capacity_score, + open_shard_count: ingester_capacity.open_shard_count, + }); + }) + .await +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + use std::sync::atomic::{AtomicUsize, Ordering}; + + use quickwit_cluster::{ChannelTransport, create_cluster_for_test}; + use quickwit_proto::types::ShardId; + + use super::*; + use crate::ingest_v2::models::IngesterShard; + use crate::ingest_v2::state::IngesterState; + + fn ts(capacity_bytes: u64) -> WalDiskCapacityTimeSeries { + WalDiskCapacityTimeSeries::new(ByteSize::b(capacity_bytes)) + } + + #[test] + fn test_wal_disk_capacity_current_after_record() { + let mut series = ts(100); + // 30 bytes used => 70 remaining => 0.70 + series.record(ByteSize::b(30)); + assert_eq!(series.current(), Some(0.70)); + + // 90 bytes used => 10 remaining => 0.10 + series.record(ByteSize::b(90)); + assert_eq!(series.current(), Some(0.10)); + } + + #[test] + fn test_wal_disk_capacity_record_saturates_at_zero() { + let mut series = ts(100); + series.record(ByteSize::b(200)); + assert_eq!(series.current(), Some(0.0)); + } + + #[test] + fn test_wal_disk_capacity_delta_growing() { + let mut series = ts(100); + // oldest: 60 used => 40 remaining + series.record(ByteSize::b(60)); + // current: 20 used => 80 remaining + series.record(ByteSize::b(20)); + // delta = 0.80 - 0.40 = 0.40 + assert_eq!(series.delta(), Some(0.40)); + } + + #[test] + fn test_wal_disk_capacity_delta_shrinking() { + let mut series = ts(100); + // oldest: 20 used => 80 remaining + series.record(ByteSize::b(20)); + // current: 60 used => 40 remaining + series.record(ByteSize::b(60)); + // delta = 0.40 - 0.80 = -0.40 + assert_eq!(series.delta(), Some(-0.40)); + } + + #[test] + fn test_capacity_score_draining_vs_stable() { + // Node A: capacity draining — usage increases 10, 20, ..., 70 over 7 ticks. + let mut node_a = ts(100); + for used in (10..=70).step_by(10) { + node_a.record(ByteSize::b(used)); + } + let a_remaining = node_a.current().unwrap(); + let a_delta = node_a.delta().unwrap(); + let a_score = compute_capacity_score(a_remaining, a_delta); + + // Node B: steady at 50% usage over 7 ticks. + let mut node_b = ts(100); + for _ in 0..7 { + node_b.record(ByteSize::b(50)); + } + let b_remaining = node_b.current().unwrap(); + let b_delta = node_b.delta().unwrap(); + let b_score = compute_capacity_score(b_remaining, b_delta); + + // p=2.4, d=0 (max drain) => 2 + assert_eq!(a_score, 2); + // p=4, d=2 (stable) => 6 + assert_eq!(b_score, 6); + assert!(b_score > a_score); + } + + #[tokio::test] + async fn test_snapshot_state_dropped() { + let transport = ChannelTransport::default(); + let cluster = create_cluster_for_test(Vec::new(), &["indexer"], &transport, true) + .await + .unwrap(); + let (_temp_dir, state) = IngesterState::for_test().await; + let weak_state = state.weak(); + drop(state); + + let task = BroadcastIngesterCapacityTask { + cluster, + weak_state, + wal_capacity_time_series: WalDiskCapacityTimeSeries::new(ByteSize::mib(256)), + }; + assert!(task.snapshot().await.is_err()); + } + + #[tokio::test] + async fn test_broadcast_ingester_capacity() { + let transport = ChannelTransport::default(); + let cluster = create_cluster_for_test(Vec::new(), &["indexer"], &transport, true) + .await + .unwrap(); + let event_broker = EventBroker::default(); + + let (_temp_dir, state) = IngesterState::for_test().await; + let index_uid = IndexUid::for_test("test-index", 0); + let mut state_guard = state.lock_partially().await.unwrap(); + let shard = IngesterShard::new_solo( + index_uid.clone(), + SourceId::from("test-source"), + ShardId::from(0), + ) + .build(); + state_guard.shards.insert(shard.queue_id(), shard); + let open_shard_counts = state_guard.get_open_shard_counts(); + drop(state_guard); + + // Simulate 500 of 1000 bytes used => 50% remaining, 0 delta => score = 60.0 + let mut task = BroadcastIngesterCapacityTask { + cluster: cluster.clone(), + weak_state: state.weak(), + wal_capacity_time_series: WalDiskCapacityTimeSeries::new(ByteSize::b(1000)), + }; + task.wal_capacity_time_series.record(ByteSize::b(500)); + + let remaining = task.wal_capacity_time_series.current().unwrap(); + let delta = task.wal_capacity_time_series.delta().unwrap(); + let capacity_score = compute_capacity_score(remaining, delta); + assert_eq!(capacity_score, 6); + + let update_counter = Arc::new(AtomicUsize::new(0)); + let update_counter_clone = update_counter.clone(); + let index_uid_clone = index_uid.clone(); + let _sub = event_broker.subscribe(move |event: IngesterCapacityUpdate| { + update_counter_clone.fetch_add(1, Ordering::Release); + assert_eq!(event.source_uid.index_uid, index_uid_clone); + assert_eq!(event.source_uid.source_id, "test-source"); + assert_eq!(event.capacity_score, 6); + assert_eq!(event.open_shard_count, 1); + }); + + let _listener = + setup_ingester_capacity_update_listener(cluster.clone(), event_broker).await; + + let previous_sources = BTreeSet::new(); + task.broadcast_capacity(capacity_score, &open_shard_counts, &previous_sources) + .await; + tokio::time::sleep(BROADCAST_INTERVAL_PERIOD * 2).await; + + assert_eq!(update_counter.load(Ordering::Acquire), 1); + + let source_uid = SourceUid { + index_uid: index_uid.clone(), + source_id: SourceId::from("test-source"), + }; + let key = make_key(INGESTER_CAPACITY_PREFIX, &source_uid); + let value = cluster.get_self_key_value(&key).await.unwrap(); + let deserialized: IngesterCapacity = serde_json::from_str(&value).unwrap(); + assert_eq!(deserialized.capacity_score, 6); + assert_eq!(deserialized.open_shard_count, 1); + } + + #[test] + fn test_wal_disk_capacity_delta_spans_lookback_window() { + let mut series = ts(100); + + // Fill to exactly the lookback window length (6 readings), all same value. + for _ in 0..WAL_CAPACITY_LOOKBACK_WINDOW_LEN { + series.record(ByteSize::b(50)); + } + assert_eq!(series.delta(), Some(0.0)); + + // 7th reading fills the ring buffer. Delta spans 6 intervals. + series.record(ByteSize::b(0)); + assert_eq!(series.delta(), Some(0.50)); + + // 8th reading evicts the oldest 50-remaining. Delta still spans 6 intervals. + series.record(ByteSize::b(0)); + assert_eq!(series.delta(), Some(0.50)); + } +} diff --git a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/local_shards.rs b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/local_shards.rs index 063cc73d372..c10c11f318e 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/local_shards.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/local_shards.rs @@ -18,6 +18,7 @@ use std::time::Duration; use bytesize::ByteSize; use quickwit_cluster::{Cluster, ListenerHandle}; use quickwit_common::pubsub::{Event, EventBroker}; +use quickwit_common::ring_buffer::RingBuffer; use quickwit_common::shared_consts::INGESTER_PRIMARY_SHARDS_PREFIX; use quickwit_common::sorted_iter::{KeyDiff, SortedByKeyIterator}; use quickwit_common::tower::{ConstantRate, Rate}; @@ -27,7 +28,7 @@ use serde::{Deserialize, Serialize, Serializer}; use tokio::task::JoinHandle; use tracing::{debug, warn}; -use super::BROADCAST_INTERVAL_PERIOD; +use super::{BROADCAST_INTERVAL_PERIOD, make_key, parse_key}; use crate::RateMibPerSec; use crate::ingest_v2::metrics::INGEST_V2_METRICS; use crate::ingest_v2::state::WeakIngesterState; @@ -224,36 +225,24 @@ impl ShardThroughputTimeSeriesMap { #[derive(Default)] struct ShardThroughputTimeSeries { shard_state: ShardState, - measurements: [ByteSize; SHARD_THROUGHPUT_LONG_TERM_WINDOW_LEN], - len: usize, + throughput: RingBuffer, } impl ShardThroughputTimeSeries { fn last(&self) -> ByteSize { - self.measurements.last().copied().unwrap_or_default() + self.throughput.last().unwrap_or_default() } fn average(&self) -> ByteSize { - if self.len == 0 { + if self.throughput.is_empty() { return ByteSize::default(); } - let sum = self - .measurements - .iter() - .rev() - .take(self.len) - .map(ByteSize::as_u64) - .sum::(); - ByteSize::b(sum / self.len as u64) + let sum = self.throughput.iter().map(ByteSize::as_u64).sum::(); + ByteSize::b(sum / self.throughput.len() as u64) } fn record(&mut self, new_throughput_measurement: ByteSize) { - self.len = (self.len + 1).min(SHARD_THROUGHPUT_LONG_TERM_WINDOW_LEN); - self.measurements.rotate_left(1); - let Some(last_measurement) = self.measurements.last_mut() else { - return; - }; - *last_measurement = new_throughput_measurement; + self.throughput.push(new_throughput_measurement); } } @@ -333,13 +322,13 @@ impl BroadcastLocalShardsTask { source_uid, shard_infos, } => { - let key = make_key(source_uid); + let key = make_key(INGESTER_PRIMARY_SHARDS_PREFIX, source_uid); let value = serde_json::to_string(&shard_infos) .expect("`ShardInfos` should be JSON serializable"); self.cluster.set_self_key_value(key, value).await; } ShardInfosChange::Removed { source_uid } => { - let key = make_key(source_uid); + let key = make_key(INGESTER_PRIMARY_SHARDS_PREFIX, source_uid); self.cluster.remove_self_key(&key).await; } } @@ -366,22 +355,6 @@ impl BroadcastLocalShardsTask { } } -fn make_key(source_uid: &SourceUid) -> String { - format!( - "{INGESTER_PRIMARY_SHARDS_PREFIX}{}:{}", - source_uid.index_uid, source_uid.source_id - ) -} - -fn parse_key(key: &str) -> Option { - let (index_uid_str, source_id_str) = key.rsplit_once(':')?; - - Some(SourceUid { - index_uid: index_uid_str.parse().ok()?, - source_id: source_id_str.to_string(), - }) -} - #[derive(Debug, Clone)] pub struct LocalShardsUpdate { pub leader_id: NodeId, @@ -623,30 +596,6 @@ mod tests { assert!(value_opt.is_none()); } - #[test] - fn test_make_key() { - let source_uid = SourceUid { - index_uid: IndexUid::for_test("test-index", 0), - source_id: SourceId::from("test-source"), - }; - let key = make_key(&source_uid); - assert_eq!( - key, - "ingester.primary_shards:test-index:00000000000000000000000000:test-source" - ); - } - - #[test] - fn test_parse_key() { - let key = "test-index:00000000000000000000000000:test-source"; - let source_uid = parse_key(key).unwrap(); - assert_eq!( - &source_uid.index_uid.to_string(), - "test-index:00000000000000000000000000" - ); - assert_eq!(source_uid.source_id, "test-source".to_string()); - } - #[tokio::test] async fn test_local_shards_update_listener() { let transport = ChannelTransport::default(); @@ -683,7 +632,7 @@ mod tests { index_uid: index_uid.clone(), source_id: SourceId::from("test-source"), }; - let key = make_key(&source_uid); + let key = make_key(INGESTER_PRIMARY_SHARDS_PREFIX, &source_uid); let value = serde_json::to_string(&vec![ShardInfo { shard_id: ShardId::from(1), shard_state: ShardState::Open, diff --git a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs index 86d75d1c283..178b1e182b5 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs @@ -13,11 +13,13 @@ // limitations under the License. #[allow(dead_code)] -mod ingester_affinity; +mod ingester_capacity; mod local_shards; use std::time::Duration; +use quickwit_proto::types::SourceUid; + pub(in crate::ingest_v2) const BROADCAST_INTERVAL_PERIOD: Duration = if cfg!(test) { Duration::from_millis(50) } else { @@ -28,3 +30,47 @@ pub use local_shards::{ BroadcastLocalShardsTask, LocalShardsUpdate, ShardInfo, ShardInfos, setup_local_shards_update_listener, }; + +fn make_key(prefix: &str, source_uid: &SourceUid) -> String { + format!("{prefix}{}:{}", source_uid.index_uid, source_uid.source_id) +} + +fn parse_key(key: &str) -> Option { + let (index_uid_str, source_id_str) = key.rsplit_once(':')?; + Some(SourceUid { + index_uid: index_uid_str.parse().ok()?, + source_id: source_id_str.to_string(), + }) +} + +#[cfg(test)] +mod tests { + use quickwit_common::shared_consts::INGESTER_PRIMARY_SHARDS_PREFIX; + use quickwit_proto::types::{IndexUid, SourceId, SourceUid}; + + use super::*; + + #[test] + fn test_make_key() { + let source_uid = SourceUid { + index_uid: IndexUid::for_test("test-index", 0), + source_id: SourceId::from("test-source"), + }; + let key = make_key(INGESTER_PRIMARY_SHARDS_PREFIX, &source_uid); + assert_eq!( + key, + "ingester.primary_shards:test-index:00000000000000000000000000:test-source" + ); + } + + #[test] + fn test_parse_key() { + let key = "test-index:00000000000000000000000000:test-source"; + let source_uid = parse_key(key).unwrap(); + assert_eq!( + &source_uid.index_uid.to_string(), + "test-index:00000000000000000000000000" + ); + assert_eq!(source_uid.source_id, "test-source".to_string()); + } +} diff --git a/quickwit/quickwit-ingest/src/ingest_v2/state.rs b/quickwit/quickwit-ingest/src/ingest_v2/state.rs index 43264c69b15..bf1c648c6cb 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/state.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/state.rs @@ -92,7 +92,7 @@ impl InnerIngesterState { pub fn get_open_shard_counts(&self) -> Vec<(IndexUid, SourceId, usize)> { self.shards .values() - .filter(|shard| shard.is_open()) + .filter(|shard| shard.is_advertisable && !shard.is_replica() && shard.is_open()) .map(|shard| (shard.index_uid.clone(), shard.source_id.clone())) .counts() .into_iter() @@ -479,7 +479,7 @@ impl WeakIngesterState { #[cfg(test)] mod tests { use bytesize::ByteSize; - use quickwit_proto::types::ShardId; + use quickwit_proto::types::{NodeId, ShardId, SourceId}; use tokio::time::timeout; use super::*; @@ -654,4 +654,77 @@ mod tests { locked_state.find_most_capacity_shard_mut(&index_uid, &SourceId::from("other-source")); assert!(shard_opt.is_none()); } + + fn open_shard( + index_uid: IndexUid, + source_id: SourceId, + shard_id: ShardId, + is_replica: bool, + ) -> IngesterShard { + let builder = if is_replica { + IngesterShard::new_replica(index_uid, source_id, shard_id, NodeId::from("test-leader")) + } else { + IngesterShard::new_solo(index_uid, source_id, shard_id) + }; + builder.advertisable().build() + } + + #[tokio::test] + async fn test_get_open_shard_counts() { + let (_temp_dir, state) = IngesterState::for_test().await; + let mut state_guard = state.lock_partially().await.unwrap(); + + let index_a = IndexUid::for_test("index-a", 0); + let index_b = IndexUid::for_test("index-b", 0); + let index_c = IndexUid::for_test("index-c", 0); + + // (index-a, source-a): 1 open solo shard. + let s = open_shard( + index_a.clone(), + SourceId::from("source-a"), + ShardId::from(1), + false, + ); + state_guard.shards.insert(s.queue_id(), s); + + // (index-b, source-b): 1 open solo + 1 replica. Only the solo should be counted. + let s = open_shard( + index_b.clone(), + SourceId::from("source-b"), + ShardId::from(2), + false, + ); + state_guard.shards.insert(s.queue_id(), s); + let s = open_shard( + index_b.clone(), + SourceId::from("source-b"), + ShardId::from(3), + true, + ); + state_guard.shards.insert(s.queue_id(), s); + + // (index-c, source-c): 2 open solo shards. + let s = open_shard( + index_c.clone(), + SourceId::from("source-c"), + ShardId::from(4), + false, + ); + state_guard.shards.insert(s.queue_id(), s); + let s = open_shard( + index_c.clone(), + SourceId::from("source-c"), + ShardId::from(5), + false, + ); + state_guard.shards.insert(s.queue_id(), s); + + let mut counts = state_guard.get_open_shard_counts(); + counts.sort_by(|a, b| a.0.cmp(&b.0)); + + assert_eq!(counts.len(), 3); + assert_eq!(counts[0], (index_a, SourceId::from("source-a"), 1)); + assert_eq!(counts[1], (index_b, SourceId::from("source-b"), 1)); + assert_eq!(counts[2], (index_c, SourceId::from("source-c"), 2)); + } } From 8c63458c5d5422ca034e7d446bccb373444a141e Mon Sep 17 00:00:00 2001 From: Nadav Gov-Ari Date: Thu, 12 Feb 2026 16:40:24 -0500 Subject: [PATCH 7/8] fix test --- .../quickwit-ingest/src/ingest_v2/broadcast/ingester_capacity.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/ingester_capacity.rs b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/ingester_capacity.rs index 0ea45d9dad3..870cac2a09c 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/ingester_capacity.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/ingester_capacity.rs @@ -385,6 +385,7 @@ mod tests { SourceId::from("test-source"), ShardId::from(0), ) + .advertisable() .build(); state_guard.shards.insert(shard.queue_id(), shard); let open_shard_counts = state_guard.get_open_shard_counts(); From f61c3873343ea5f93872865fb43d1eea47243b2b Mon Sep 17 00:00:00 2001 From: Nadav Gov-Ari Date: Tue, 17 Feb 2026 11:14:56 -0500 Subject: [PATCH 8/8] Address PR comments --- quickwit/quickwit-common/src/ring_buffer.rs | 72 ++++--- quickwit/quickwit-common/src/shared_consts.rs | 2 +- ...capacity.rs => ingester_capacity_score.rs} | 182 +++++++++--------- .../src/ingest_v2/broadcast/local_shards.rs | 2 +- .../src/ingest_v2/broadcast/mod.rs | 2 +- 5 files changed, 134 insertions(+), 126 deletions(-) rename quickwit/quickwit-ingest/src/ingest_v2/broadcast/{ingester_capacity.rs => ingester_capacity_score.rs} (74%) diff --git a/quickwit/quickwit-common/src/ring_buffer.rs b/quickwit/quickwit-common/src/ring_buffer.rs index f92dc504bb3..5d884d8188f 100644 --- a/quickwit/quickwit-common/src/ring_buffer.rs +++ b/quickwit/quickwit-common/src/ring_buffer.rs @@ -14,13 +14,23 @@ use std::fmt::{Debug, Formatter}; -/// A fixed-capacity circular buffer that overwrites the oldest element when full. +/// Fixed-size buffer that keeps the last N elements pushed into it. /// -/// Elements are stored in a flat array of size `N` and rotated on each push. -/// The newest element is always at position `N - 1` (the last slot), and the -/// oldest is at position `N - len`. +/// `head` is the write cursor. It advances by one on each push and wraps +/// back to 0 when it reaches N, overwriting the oldest element. +/// +/// ```text +/// RingBuffer after pushing 1, 2, 3, 4, 5, 6: +/// +/// buffer = [5, 6, 3, 4] head = 2 len = 4 +/// ^ +/// next write goes here +/// +/// logical view (oldest → newest): [3, 4, 5, 6] +/// ``` pub struct RingBuffer { buffer: [T; N], + head: usize, len: usize, } @@ -28,6 +38,7 @@ impl Default for RingBuffer { fn default() -> Self { Self { buffer: [T::default(); N], + head: 0, len: 0, } } @@ -40,11 +51,11 @@ impl Debug for RingBuffer { } impl RingBuffer { - pub fn push(&mut self, value: T) { - self.len = (self.len + 1).min(N); - self.buffer.rotate_left(1); - if let Some(last) = self.buffer.last_mut() { - *last = value; + pub fn push_back(&mut self, value: T) { + self.buffer[self.head] = value; + self.head = (self.head + 1) % N; + if self.len < N { + self.len += 1; } } @@ -52,14 +63,14 @@ impl RingBuffer { if self.len == 0 { return None; } - self.buffer.last().copied() + Some(self.buffer[(self.head + N - 1) % N]) } - pub fn oldest(&self) -> Option { + pub fn front(&self) -> Option { if self.len == 0 { return None; } - Some(self.buffer[N - self.len]) + Some(self.buffer[(self.head + N - self.len) % N]) } pub fn len(&self) -> usize { @@ -70,9 +81,9 @@ impl RingBuffer { self.len == 0 } - /// Iterates from oldest to newest over the recorded elements. - pub fn iter(&self) -> impl Iterator { - self.buffer[N - self.len..].iter() + pub fn iter(&self) -> impl Iterator + '_ { + let start = (self.head + N - self.len) % N; + (0..self.len).map(move |i| &self.buffer[(start + i) % N]) } } @@ -86,30 +97,30 @@ mod tests { assert!(rb.is_empty()); assert_eq!(rb.len(), 0); assert_eq!(rb.last(), None); - assert_eq!(rb.oldest(), None); + assert_eq!(rb.front(), None); assert_eq!(rb.iter().count(), 0); } #[test] fn test_single_push() { let mut rb = RingBuffer::::default(); - rb.push(10); + rb.push_back(10); assert_eq!(rb.len(), 1); assert!(!rb.is_empty()); assert_eq!(rb.last(), Some(10)); - assert_eq!(rb.oldest(), Some(10)); + assert_eq!(rb.front(), Some(10)); assert_eq!(rb.iter().copied().collect::>(), vec![10]); } #[test] fn test_partial_fill() { let mut rb = RingBuffer::::default(); - rb.push(1); - rb.push(2); - rb.push(3); + rb.push_back(1); + rb.push_back(2); + rb.push_back(3); assert_eq!(rb.len(), 3); assert_eq!(rb.last(), Some(3)); - assert_eq!(rb.oldest(), Some(1)); + assert_eq!(rb.front(), Some(1)); assert_eq!(rb.iter().copied().collect::>(), vec![1, 2, 3]); } @@ -117,11 +128,11 @@ mod tests { fn test_exactly_full() { let mut rb = RingBuffer::::default(); for i in 1..=4 { - rb.push(i); + rb.push_back(i); } assert_eq!(rb.len(), 4); assert_eq!(rb.last(), Some(4)); - assert_eq!(rb.oldest(), Some(1)); + assert_eq!(rb.front(), Some(1)); assert_eq!(rb.iter().copied().collect::>(), vec![1, 2, 3, 4]); } @@ -129,12 +140,11 @@ mod tests { fn test_wrap_around() { let mut rb = RingBuffer::::default(); for i in 1..=6 { - rb.push(i); + rb.push_back(i); } - // Buffer should contain [3, 4, 5, 6], oldest overwritten. assert_eq!(rb.len(), 4); assert_eq!(rb.last(), Some(6)); - assert_eq!(rb.oldest(), Some(3)); + assert_eq!(rb.front(), Some(3)); assert_eq!(rb.iter().copied().collect::>(), vec![3, 4, 5, 6]); } @@ -142,19 +152,19 @@ mod tests { fn test_many_wraps() { let mut rb = RingBuffer::::default(); for i in 1..=100 { - rb.push(i); + rb.push_back(i); } assert_eq!(rb.len(), 3); assert_eq!(rb.last(), Some(100)); - assert_eq!(rb.oldest(), Some(98)); + assert_eq!(rb.front(), Some(98)); assert_eq!(rb.iter().copied().collect::>(), vec![98, 99, 100]); } #[test] fn test_debug() { let mut rb = RingBuffer::::default(); - rb.push(1); - rb.push(2); + rb.push_back(1); + rb.push_back(2); assert_eq!(format!("{:?}", rb), "[1, 2]"); } } diff --git a/quickwit/quickwit-common/src/shared_consts.rs b/quickwit/quickwit-common/src/shared_consts.rs index 56db9e13691..437058f28fb 100644 --- a/quickwit/quickwit-common/src/shared_consts.rs +++ b/quickwit/quickwit-common/src/shared_consts.rs @@ -65,7 +65,7 @@ pub const SCROLL_BATCH_LEN: usize = 1_000; pub const INGESTER_PRIMARY_SHARDS_PREFIX: &str = "ingester.primary_shards:"; /// Prefix used in chitchat to broadcast per-source ingester capacity scores and open shard counts. -pub const INGESTER_CAPACITY_PREFIX: &str = "ingester.capacity:"; +pub const INGESTER_CAPACITY_SCORE_PREFIX: &str = "ingester.capacity_score:"; /// File name for the encoded list of fields in the split pub const SPLIT_FIELDS_FILE_NAME: &str = "split_fields"; diff --git a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/ingester_capacity.rs b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/ingester_capacity_score.rs similarity index 74% rename from quickwit/quickwit-ingest/src/ingest_v2/broadcast/ingester_capacity.rs rename to quickwit/quickwit-ingest/src/ingest_v2/broadcast/ingester_capacity_score.rs index 870cac2a09c..6f8abc66ef8 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/ingester_capacity.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/ingester_capacity_score.rs @@ -19,7 +19,7 @@ use bytesize::ByteSize; use quickwit_cluster::{Cluster, ListenerHandle}; use quickwit_common::pubsub::{Event, EventBroker}; use quickwit_common::ring_buffer::RingBuffer; -use quickwit_common::shared_consts::INGESTER_CAPACITY_PREFIX; +use quickwit_common::shared_consts::INGESTER_CAPACITY_SCORE_PREFIX; use quickwit_proto::ingest::ingester::IngesterStatus; use quickwit_proto::types::{IndexUid, NodeId, SourceId, SourceUid}; use serde::{Deserialize, Serialize}; @@ -41,44 +41,37 @@ const WAL_CAPACITY_LOOKBACK_WINDOW_LEN: usize = 6; /// reading would be discarded when the next reading is inserted. const WAL_CAPACITY_READINGS_LEN: usize = WAL_CAPACITY_LOOKBACK_WINDOW_LEN + 1; -struct WalDiskCapacityTimeSeries { - wal_capacity: ByteSize, - readings: RingBuffer, +struct WalMemoryCapacityTimeSeries { + readings: RingBuffer, } -impl WalDiskCapacityTimeSeries { - fn new(wal_capacity: ByteSize) -> Self { - assert!( - wal_capacity.as_u64() > 0, - "WAL capacity must be greater than zero" - ); +impl WalMemoryCapacityTimeSeries { + fn new() -> Self { Self { - wal_capacity, readings: RingBuffer::default(), } } - fn record(&mut self, wal_used: ByteSize) { - let remaining = ByteSize::b(self.wal_capacity.as_u64().saturating_sub(wal_used.as_u64())); - self.readings.push(remaining); + fn record(&mut self, memory_used: ByteSize, memory_allocated: ByteSize) { + let allocated = memory_allocated.as_u64(); + if allocated == 0 { + self.readings.push_back(1.0); + return; + } + let remaining = 1.0 - (memory_used.as_u64() as f64 / allocated as f64); + self.readings.push_back(remaining.clamp(0.0, 1.0)); } - /// Returns the most recent remaining capacity as a fraction of total WAL capacity, - /// or `None` if no readings have been recorded yet. fn current(&self) -> Option { - self.readings.last().map(|b| self.as_capacity_pct(b)) - } - - fn as_capacity_pct(&self, bytes: ByteSize) -> f64 { - bytes.as_u64() as f64 / self.wal_capacity.as_u64() as f64 + self.readings.last() } /// How much remaining capacity changed between the oldest and newest readings. /// Positive = improving, negative = draining. fn delta(&self) -> Option { let current = self.readings.last()?; - let oldest = self.readings.oldest()?; - Some(self.as_capacity_pct(current) - self.as_capacity_pct(oldest)) + let oldest = self.readings.front()?; + Some(current - oldest) } } @@ -121,34 +114,30 @@ fn compute_capacity_score(remaining_capacity: f64, capacity_delta: f64) -> usize } #[derive(Debug, Clone, Default, Serialize, Deserialize)] -pub struct IngesterCapacity { +pub struct IngesterCapacityScore { pub capacity_score: usize, pub open_shard_count: usize, } -/// Periodically snapshots the ingester's WAL usage and open shard counts, computes +/// Periodically snapshots the ingester's WAL memory usage and open shard counts, computes /// a capacity score, and broadcasts it to other nodes via Chitchat. -pub struct BroadcastIngesterCapacityTask { +pub(crate) struct BroadcastIngesterCapacityScoreTask { cluster: Cluster, weak_state: WeakIngesterState, - wal_capacity_time_series: WalDiskCapacityTimeSeries, + wal_capacity_time_series: WalMemoryCapacityTimeSeries, } -impl BroadcastIngesterCapacityTask { - pub fn spawn( - cluster: Cluster, - weak_state: WeakIngesterState, - wal_capacity: ByteSize, - ) -> JoinHandle<()> { +impl BroadcastIngesterCapacityScoreTask { + pub fn spawn(cluster: Cluster, weak_state: WeakIngesterState) -> JoinHandle<()> { let mut broadcaster = Self { cluster, weak_state, - wal_capacity_time_series: WalDiskCapacityTimeSeries::new(wal_capacity), + wal_capacity_time_series: WalMemoryCapacityTimeSeries::new(), }; tokio::spawn(async move { broadcaster.run().await }) } - async fn snapshot(&self) -> Result> { + async fn snapshot(&self) -> Result> { let state = self .weak_state .upgrade() @@ -164,10 +153,12 @@ impl BroadcastIngesterCapacityTask { .lock_fully() .await .map_err(|_| anyhow::anyhow!("failed to acquire ingester state lock"))?; - let wal_used = ByteSize::b(guard.mrecordlog.resource_usage().disk_used_bytes as u64); + let usage = guard.mrecordlog.resource_usage(); + let memory_used = ByteSize::b(usage.memory_used_bytes as u64); + let memory_allocated = ByteSize::b(usage.memory_allocated_bytes as u64); let open_shard_counts = guard.get_open_shard_counts(); - Ok(Some((wal_used, open_shard_counts))) + Ok(Some((memory_used, memory_allocated, open_shard_counts))) } async fn run(&mut self) { @@ -177,7 +168,7 @@ impl BroadcastIngesterCapacityTask { loop { interval.tick().await; - let (wal_used, open_shard_counts) = match self.snapshot().await { + let (memory_used, memory_allocated, open_shard_counts) = match self.snapshot().await { Ok(Some(snapshot)) => snapshot, Ok(None) => continue, Err(error) => { @@ -186,7 +177,8 @@ impl BroadcastIngesterCapacityTask { } }; - self.wal_capacity_time_series.record(wal_used); + self.wal_capacity_time_series + .record(memory_used, memory_allocated); let remaining_capacity = self.wal_capacity_time_series.current().unwrap_or(1.0); let capacity_delta = self.wal_capacity_time_series.delta().unwrap_or(0.0); @@ -211,19 +203,19 @@ impl BroadcastIngesterCapacityTask { index_uid: index_uid.clone(), source_id: source_id.clone(), }; - let key = make_key(INGESTER_CAPACITY_PREFIX, &source_uid); - let capacity = IngesterCapacity { + let key = make_key(INGESTER_CAPACITY_SCORE_PREFIX, &source_uid); + let capacity = IngesterCapacityScore { capacity_score, open_shard_count: *open_shard_count, }; let value = serde_json::to_string(&capacity) - .expect("`IngesterCapacity` should be JSON serializable"); + .expect("`IngesterCapacityScore` should be JSON serializable"); self.cluster.set_self_key_value(key, value).await; current_sources.insert(source_uid); } for removed_source in previous_sources.difference(¤t_sources) { - let key = make_key(INGESTER_CAPACITY_PREFIX, removed_source); + let key = make_key(INGESTER_CAPACITY_SCORE_PREFIX, removed_source); self.cluster.remove_self_key(&key).await; } @@ -232,32 +224,32 @@ impl BroadcastIngesterCapacityTask { } #[derive(Debug, Clone)] -pub struct IngesterCapacityUpdate { +pub struct IngesterCapacityScoreUpdate { pub node_id: NodeId, pub source_uid: SourceUid, pub capacity_score: usize, pub open_shard_count: usize, } -impl Event for IngesterCapacityUpdate {} +impl Event for IngesterCapacityScoreUpdate {} pub async fn setup_ingester_capacity_update_listener( cluster: Cluster, event_broker: EventBroker, ) -> ListenerHandle { cluster - .subscribe(INGESTER_CAPACITY_PREFIX, move |event| { + .subscribe(INGESTER_CAPACITY_SCORE_PREFIX, move |event| { let Some(source_uid) = parse_key(event.key) else { warn!("failed to parse source UID from key `{}`", event.key); return; }; - let Ok(ingester_capacity) = serde_json::from_str::(event.value) + let Ok(ingester_capacity) = serde_json::from_str::(event.value) else { warn!("failed to parse ingester capacity `{}`", event.value); return; }; let node_id: NodeId = event.node.node_id.clone().into(); - event_broker.publish(IngesterCapacityUpdate { + event_broker.publish(IngesterCapacityScoreUpdate { node_id, source_uid, capacity_score: ingester_capacity.capacity_score, @@ -279,47 +271,52 @@ mod tests { use crate::ingest_v2::models::IngesterShard; use crate::ingest_v2::state::IngesterState; - fn ts(capacity_bytes: u64) -> WalDiskCapacityTimeSeries { - WalDiskCapacityTimeSeries::new(ByteSize::b(capacity_bytes)) + fn ts() -> WalMemoryCapacityTimeSeries { + WalMemoryCapacityTimeSeries::new() + } + + /// Helper: record a reading with `used` out of `allocated` bytes. + fn record(series: &mut WalMemoryCapacityTimeSeries, used: u64, allocated: u64) { + series.record(ByteSize::b(used), ByteSize::b(allocated)); } #[test] - fn test_wal_disk_capacity_current_after_record() { - let mut series = ts(100); - // 30 bytes used => 70 remaining => 0.70 - series.record(ByteSize::b(30)); - assert_eq!(series.current(), Some(0.70)); - - // 90 bytes used => 10 remaining => 0.10 - series.record(ByteSize::b(90)); - assert_eq!(series.current(), Some(0.10)); + fn test_wal_memory_capacity_current_after_record() { + let mut series = ts(); + // 192 of 256 used => 25% remaining + record(&mut series, 192, 256); + assert_eq!(series.current(), Some(0.25)); + + // 16 of 256 used => 93.75% remaining + record(&mut series, 16, 256); + assert_eq!(series.current(), Some(0.9375)); } #[test] - fn test_wal_disk_capacity_record_saturates_at_zero() { - let mut series = ts(100); - series.record(ByteSize::b(200)); + fn test_wal_memory_capacity_record_saturates_at_zero() { + let mut series = ts(); + record(&mut series, 200, 100); assert_eq!(series.current(), Some(0.0)); } #[test] - fn test_wal_disk_capacity_delta_growing() { - let mut series = ts(100); - // oldest: 60 used => 40 remaining - series.record(ByteSize::b(60)); - // current: 20 used => 80 remaining - series.record(ByteSize::b(20)); + fn test_wal_memory_capacity_delta_growing() { + let mut series = ts(); + // oldest: 60 of 100 used => 40% remaining + record(&mut series, 60, 100); + // current: 20 of 100 used => 80% remaining + record(&mut series, 20, 100); // delta = 0.80 - 0.40 = 0.40 assert_eq!(series.delta(), Some(0.40)); } #[test] - fn test_wal_disk_capacity_delta_shrinking() { - let mut series = ts(100); - // oldest: 20 used => 80 remaining - series.record(ByteSize::b(20)); - // current: 60 used => 40 remaining - series.record(ByteSize::b(60)); + fn test_wal_memory_capacity_delta_shrinking() { + let mut series = ts(); + // oldest: 20 of 100 used => 80% remaining + record(&mut series, 20, 100); + // current: 60 of 100 used => 40% remaining + record(&mut series, 60, 100); // delta = 0.40 - 0.80 = -0.40 assert_eq!(series.delta(), Some(-0.40)); } @@ -327,18 +324,18 @@ mod tests { #[test] fn test_capacity_score_draining_vs_stable() { // Node A: capacity draining — usage increases 10, 20, ..., 70 over 7 ticks. - let mut node_a = ts(100); + let mut node_a = ts(); for used in (10..=70).step_by(10) { - node_a.record(ByteSize::b(used)); + record(&mut node_a, used, 100); } let a_remaining = node_a.current().unwrap(); let a_delta = node_a.delta().unwrap(); let a_score = compute_capacity_score(a_remaining, a_delta); // Node B: steady at 50% usage over 7 ticks. - let mut node_b = ts(100); + let mut node_b = ts(); for _ in 0..7 { - node_b.record(ByteSize::b(50)); + record(&mut node_b, 50, 100); } let b_remaining = node_b.current().unwrap(); let b_delta = node_b.delta().unwrap(); @@ -361,10 +358,10 @@ mod tests { let weak_state = state.weak(); drop(state); - let task = BroadcastIngesterCapacityTask { + let task = BroadcastIngesterCapacityScoreTask { cluster, weak_state, - wal_capacity_time_series: WalDiskCapacityTimeSeries::new(ByteSize::mib(256)), + wal_capacity_time_series: WalMemoryCapacityTimeSeries::new(), }; assert!(task.snapshot().await.is_err()); } @@ -391,13 +388,14 @@ mod tests { let open_shard_counts = state_guard.get_open_shard_counts(); drop(state_guard); - // Simulate 500 of 1000 bytes used => 50% remaining, 0 delta => score = 60.0 - let mut task = BroadcastIngesterCapacityTask { + // Simulate 500 of 1000 bytes used => 50% remaining, 0 delta => score = 6 + let mut task = BroadcastIngesterCapacityScoreTask { cluster: cluster.clone(), weak_state: state.weak(), - wal_capacity_time_series: WalDiskCapacityTimeSeries::new(ByteSize::b(1000)), + wal_capacity_time_series: WalMemoryCapacityTimeSeries::new(), }; - task.wal_capacity_time_series.record(ByteSize::b(500)); + task.wal_capacity_time_series + .record(ByteSize::b(500), ByteSize::b(1000)); let remaining = task.wal_capacity_time_series.current().unwrap(); let delta = task.wal_capacity_time_series.delta().unwrap(); @@ -407,7 +405,7 @@ mod tests { let update_counter = Arc::new(AtomicUsize::new(0)); let update_counter_clone = update_counter.clone(); let index_uid_clone = index_uid.clone(); - let _sub = event_broker.subscribe(move |event: IngesterCapacityUpdate| { + let _sub = event_broker.subscribe(move |event: IngesterCapacityScoreUpdate| { update_counter_clone.fetch_add(1, Ordering::Release); assert_eq!(event.source_uid.index_uid, index_uid_clone); assert_eq!(event.source_uid.source_id, "test-source"); @@ -429,29 +427,29 @@ mod tests { index_uid: index_uid.clone(), source_id: SourceId::from("test-source"), }; - let key = make_key(INGESTER_CAPACITY_PREFIX, &source_uid); + let key = make_key(INGESTER_CAPACITY_SCORE_PREFIX, &source_uid); let value = cluster.get_self_key_value(&key).await.unwrap(); - let deserialized: IngesterCapacity = serde_json::from_str(&value).unwrap(); + let deserialized: IngesterCapacityScore = serde_json::from_str(&value).unwrap(); assert_eq!(deserialized.capacity_score, 6); assert_eq!(deserialized.open_shard_count, 1); } #[test] - fn test_wal_disk_capacity_delta_spans_lookback_window() { - let mut series = ts(100); + fn test_wal_memory_capacity_delta_spans_lookback_window() { + let mut series = ts(); // Fill to exactly the lookback window length (6 readings), all same value. for _ in 0..WAL_CAPACITY_LOOKBACK_WINDOW_LEN { - series.record(ByteSize::b(50)); + record(&mut series, 50, 100); } assert_eq!(series.delta(), Some(0.0)); // 7th reading fills the ring buffer. Delta spans 6 intervals. - series.record(ByteSize::b(0)); + record(&mut series, 0, 100); assert_eq!(series.delta(), Some(0.50)); // 8th reading evicts the oldest 50-remaining. Delta still spans 6 intervals. - series.record(ByteSize::b(0)); + record(&mut series, 0, 100); assert_eq!(series.delta(), Some(0.50)); } } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/local_shards.rs b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/local_shards.rs index c10c11f318e..6ba10915f56 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/local_shards.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/local_shards.rs @@ -242,7 +242,7 @@ impl ShardThroughputTimeSeries { } fn record(&mut self, new_throughput_measurement: ByteSize) { - self.throughput.push(new_throughput_measurement); + self.throughput.push_back(new_throughput_measurement); } } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs index 178b1e182b5..d2184a0e392 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs @@ -13,7 +13,7 @@ // limitations under the License. #[allow(dead_code)] -mod ingester_capacity; +mod ingester_capacity_score; mod local_shards; use std::time::Duration;