diff --git a/quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs b/quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs
index d1eab562095..aa8a0fa7423 100644
--- a/quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs
+++ b/quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs
@@ -299,6 +299,7 @@ pub struct LocalShardsUpdate {
pub leader_id: NodeId,
pub source_uid: SourceUid,
pub shard_infos: ShardInfos,
+ pub is_deletion: bool,
}
impl Event for LocalShardsUpdate {}
@@ -323,6 +324,7 @@ pub async fn setup_local_shards_update_listener(
leader_id,
source_uid,
shard_infos,
+ is_deletion: false,
};
event_broker.publish(local_shards_update);
})
diff --git a/quickwit/quickwit-ingest/src/ingest_v2/debouncing.rs b/quickwit/quickwit-ingest/src/ingest_v2/debouncing.rs
index 79095ca64e0..3f3ccba7fad 100644
--- a/quickwit/quickwit-ingest/src/ingest_v2/debouncing.rs
+++ b/quickwit/quickwit-ingest/src/ingest_v2/debouncing.rs
@@ -17,6 +17,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see .
+use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::sync::Arc;
@@ -39,6 +40,10 @@ impl Debouncer {
Err(BarrierGuard(barrier))
}
}
+
+ fn strong_count(&self) -> usize {
+ Arc::strong_count(&self.0)
+ }
}
#[derive(Debug)]
@@ -69,6 +74,15 @@ impl GetOrCreateOpenShardsRequestDebouncer {
let key = (index_id.to_string(), source_id.to_string());
self.debouncers.entry(key).or_default().acquire()
}
+
+ pub fn delete_if_released(&mut self, index_id: &str, source_id: &str) {
+ let key = (index_id.to_string(), source_id.to_string());
+ if let Entry::Occupied(entry) = self.debouncers.entry(key) {
+ if entry.get().strong_count() == 1 {
+ entry.remove();
+ }
+ }
+ }
}
#[derive(Default)]
diff --git a/quickwit/quickwit-ingest/src/ingest_v2/router.rs b/quickwit/quickwit-ingest/src/ingest_v2/router.rs
index 291873232ce..acf5ffacf37 100644
--- a/quickwit/quickwit-ingest/src/ingest_v2/router.rs
+++ b/quickwit/quickwit-ingest/src/ingest_v2/router.rs
@@ -327,7 +327,7 @@ impl IngestRouter {
for ((index_uid, source_id), shard_ids) in deleted_shards {
state_guard
.routing_table
- .delete_shards(&index_uid, source_id, &shard_ids);
+ .delete_shards_by_id(&index_uid, source_id, &shard_ids);
}
}
}
@@ -472,6 +472,20 @@ impl EventSubscriber for WeakRouterState {
let index_uid = local_shards_update.source_uid.index_uid;
let source_id = local_shards_update.source_uid.source_id;
+ if local_shards_update.is_deletion {
+ let mut state_guard = state.lock().await;
+
+ if state_guard
+ .routing_table
+ .delete_shards_by_leader_id(&index_uid, &source_id, &leader_id)
+ .is_some()
+ {
+ state_guard
+ .debouncer
+ .delete_if_released(&index_uid.index_id, &source_id);
+ }
+ return;
+ };
let mut open_shard_ids: Vec = Vec::new();
let mut closed_shard_ids: Vec = Vec::new();
@@ -520,7 +534,7 @@ impl EventSubscriber for WeakRouterState {
state_guard
.routing_table
- .delete_shards(&index_uid, &source_id, &deleted_shard_ids);
+ .delete_shards_by_id(&index_uid, &source_id, &deleted_shard_ids);
}
}
@@ -1531,6 +1545,7 @@ mod tests {
ingestion_rate: RateMibPerSec(0),
},
]),
+ is_deletion: false,
};
event_broker.publish(local_shards_update);
diff --git a/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs b/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs
index d564cc1f483..4fda501c1fe 100644
--- a/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs
+++ b/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs
@@ -17,7 +17,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see .
-use std::collections::hash_map::Entry;
+use std::collections::hash_map::{Entry, OccupiedEntry};
use std::collections::{HashMap, HashSet};
use std::sync::atomic::{AtomicUsize, Ordering};
@@ -275,8 +275,8 @@ impl RoutingTableEntry {
}
}
- /// Shards the shards identified by their shard IDs.
- fn delete_shards(&mut self, index_uid: &IndexUid, shard_ids: &[ShardId]) {
+ /// Deletes the shards identified by their shard IDs.
+ fn delete_shards_by_id(&mut self, index_uid: &IndexUid, shard_ids: &[ShardId]) {
// If the shard table was just recently updated with shards for a new index UID, then we can
// safely discard this request.
if self.index_uid != *index_uid {
@@ -308,6 +308,22 @@ impl RoutingTableEntry {
}
}
+ /// Deletes the shards.
+ fn delete_shards_by_leader_id(&mut self, index_uid: &IndexUid, leader_id: &NodeId) {
+ // If the shard table was just recently updated with shards for a new index UID, then we can
+ // safely discard this request.
+ if self.index_uid != *index_uid {
+ return;
+ }
+ for shards in [&mut self.local_shards, &mut self.remote_shards] {
+ shards.retain(|shard| shard.leader_id != *leader_id);
+ }
+ }
+
+ fn is_empty(&self) -> bool {
+ self.local_shards.is_empty() && self.remote_shards.is_empty()
+ }
+
#[cfg(test)]
pub fn len(&self) -> usize {
self.local_shards.len() + self.remote_shards.len()
@@ -439,17 +455,39 @@ impl RoutingTable {
}
}
- /// Deletes the targeted shards.
- pub fn delete_shards(
+ pub fn delete_shards_by_id(
&mut self,
index_uid: &IndexUid,
source_id: impl Into,
shard_ids: &[ShardId],
- ) {
+ ) -> Option<(IndexUid, SourceId)> {
let key = (index_uid.index_id.clone(), source_id.into());
- if let Some(entry) = self.table.get_mut(&key) {
- entry.delete_shards(index_uid, shard_ids);
+
+ if let Entry::Occupied(mut occupied_entry) = self.table.entry(key) {
+ occupied_entry
+ .get_mut()
+ .delete_shards_by_id(index_uid, shard_ids);
+
+ return delete_entry_if_empty(occupied_entry, index_uid);
}
+ None
+ }
+
+ pub fn delete_shards_by_leader_id(
+ &mut self,
+ index_uid: &IndexUid,
+ source_id: impl Into,
+ leader_id: &NodeId,
+ ) -> Option<(IndexUid, SourceId)> {
+ let key = (index_uid.index_id.clone(), source_id.into());
+ if let Entry::Occupied(mut occupied_entry) = self.table.entry(key) {
+ occupied_entry
+ .get_mut()
+ .delete_shards_by_leader_id(index_uid, leader_id);
+
+ return delete_entry_if_empty(occupied_entry, index_uid);
+ }
+ None
}
#[cfg(test)]
@@ -458,6 +496,22 @@ impl RoutingTable {
}
}
+fn delete_entry_if_empty(
+ occupied_entry: OccupiedEntry<(IndexId, SourceId), RoutingTableEntry>,
+ index_uid: &IndexUid,
+) -> Option<(IndexUid, SourceId)> {
+ if occupied_entry.get().is_empty() && occupied_entry.get().index_uid == *index_uid {
+ let table_entry = occupied_entry.remove();
+ info!(
+ index_uid=%table_entry.index_uid,
+ source_id=%table_entry.source_id,
+ "deleted routing table entry"
+ );
+ return Some((table_entry.index_uid, table_entry.source_id));
+ }
+ None
+}
+
#[cfg(test)]
mod tests {
use quickwit_proto::ingest::ingester::IngesterServiceClient;
@@ -951,8 +1005,8 @@ mod tests {
let source_id: SourceId = "test-source".into();
let mut table_entry = RoutingTableEntry::empty(index_uid.clone(), source_id.clone());
- table_entry.delete_shards(&index_uid, &[]);
- table_entry.delete_shards(&index_uid, &[ShardId::from(1)]);
+ table_entry.delete_shards_by_id(&index_uid, &[]);
+ table_entry.delete_shards_by_id(&index_uid, &[ShardId::from(1)]);
assert!(table_entry.local_shards.is_empty());
assert!(table_entry.remote_shards.is_empty());
@@ -1008,7 +1062,7 @@ mod tests {
],
remote_round_robin_idx: AtomicUsize::default(),
};
- table_entry.delete_shards(
+ table_entry.delete_shards_by_id(
&index_uid,
&[
ShardId::from(1),