Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ pub struct LocalShardsUpdate {
pub leader_id: NodeId,
pub source_uid: SourceUid,
pub shard_infos: ShardInfos,
pub is_deletion: bool,
}

impl Event for LocalShardsUpdate {}
Expand All @@ -323,6 +324,7 @@ pub async fn setup_local_shards_update_listener(
leader_id,
source_uid,
shard_infos,
is_deletion: false,
};
event_broker.publish(local_shards_update);
})
Expand Down
14 changes: 14 additions & 0 deletions quickwit/quickwit-ingest/src/ingest_v2/debouncing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::sync::Arc;

Expand All @@ -39,6 +40,10 @@ impl Debouncer {
Err(BarrierGuard(barrier))
}
}

fn strong_count(&self) -> usize {
Arc::strong_count(&self.0)
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -69,6 +74,15 @@ impl GetOrCreateOpenShardsRequestDebouncer {
let key = (index_id.to_string(), source_id.to_string());
self.debouncers.entry(key).or_default().acquire()
}

pub fn delete_if_released(&mut self, index_id: &str, source_id: &str) {
let key = (index_id.to_string(), source_id.to_string());
if let Entry::Occupied(entry) = self.debouncers.entry(key) {
if entry.get().strong_count() == 1 {
entry.remove();
}
}
}
}

#[derive(Default)]
Expand Down
19 changes: 17 additions & 2 deletions quickwit/quickwit-ingest/src/ingest_v2/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ impl IngestRouter {
for ((index_uid, source_id), shard_ids) in deleted_shards {
state_guard
.routing_table
.delete_shards(&index_uid, source_id, &shard_ids);
.delete_shards_by_id(&index_uid, source_id, &shard_ids);
}
}
}
Expand Down Expand Up @@ -472,6 +472,20 @@ impl EventSubscriber<LocalShardsUpdate> for WeakRouterState {
let index_uid = local_shards_update.source_uid.index_uid;
let source_id = local_shards_update.source_uid.source_id;

if local_shards_update.is_deletion {
let mut state_guard = state.lock().await;

if state_guard
.routing_table
.delete_shards_by_leader_id(&index_uid, &source_id, &leader_id)
.is_some()
{
state_guard
.debouncer
.delete_if_released(&index_uid.index_id, &source_id);
}
return;
};
let mut open_shard_ids: Vec<ShardId> = Vec::new();
let mut closed_shard_ids: Vec<ShardId> = Vec::new();

Expand Down Expand Up @@ -520,7 +534,7 @@ impl EventSubscriber<ShardPositionsUpdate> for WeakRouterState {

state_guard
.routing_table
.delete_shards(&index_uid, &source_id, &deleted_shard_ids);
.delete_shards_by_id(&index_uid, &source_id, &deleted_shard_ids);
}
}

Expand Down Expand Up @@ -1531,6 +1545,7 @@ mod tests {
ingestion_rate: RateMibPerSec(0),
},
]),
is_deletion: false,
};
event_broker.publish(local_shards_update);

Expand Down
76 changes: 65 additions & 11 deletions quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::collections::hash_map::Entry;
use std::collections::hash_map::{Entry, OccupiedEntry};
use std::collections::{HashMap, HashSet};
use std::sync::atomic::{AtomicUsize, Ordering};

Expand Down Expand Up @@ -275,8 +275,8 @@ impl RoutingTableEntry {
}
}

/// Shards the shards identified by their shard IDs.
fn delete_shards(&mut self, index_uid: &IndexUid, shard_ids: &[ShardId]) {
/// Deletes the shards identified by their shard IDs.
fn delete_shards_by_id(&mut self, index_uid: &IndexUid, shard_ids: &[ShardId]) {
// If the shard table was just recently updated with shards for a new index UID, then we can
// safely discard this request.
if self.index_uid != *index_uid {
Expand Down Expand Up @@ -308,6 +308,22 @@ impl RoutingTableEntry {
}
}

/// Deletes the shards.
fn delete_shards_by_leader_id(&mut self, index_uid: &IndexUid, leader_id: &NodeId) {
// If the shard table was just recently updated with shards for a new index UID, then we can
// safely discard this request.
if self.index_uid != *index_uid {
return;
}
for shards in [&mut self.local_shards, &mut self.remote_shards] {
shards.retain(|shard| shard.leader_id != *leader_id);
}
}

fn is_empty(&self) -> bool {
self.local_shards.is_empty() && self.remote_shards.is_empty()
}

#[cfg(test)]
pub fn len(&self) -> usize {
self.local_shards.len() + self.remote_shards.len()
Expand Down Expand Up @@ -439,17 +455,39 @@ impl RoutingTable {
}
}

/// Deletes the targeted shards.
pub fn delete_shards(
pub fn delete_shards_by_id(
&mut self,
index_uid: &IndexUid,
source_id: impl Into<SourceId>,
shard_ids: &[ShardId],
) {
) -> Option<(IndexUid, SourceId)> {
let key = (index_uid.index_id.clone(), source_id.into());
if let Some(entry) = self.table.get_mut(&key) {
entry.delete_shards(index_uid, shard_ids);

if let Entry::Occupied(mut occupied_entry) = self.table.entry(key) {
occupied_entry
.get_mut()
.delete_shards_by_id(index_uid, shard_ids);

return delete_entry_if_empty(occupied_entry, index_uid);
}
None
}

pub fn delete_shards_by_leader_id(
&mut self,
index_uid: &IndexUid,
source_id: impl Into<SourceId>,
leader_id: &NodeId,
) -> Option<(IndexUid, SourceId)> {
let key = (index_uid.index_id.clone(), source_id.into());
if let Entry::Occupied(mut occupied_entry) = self.table.entry(key) {
occupied_entry
.get_mut()
.delete_shards_by_leader_id(index_uid, leader_id);

return delete_entry_if_empty(occupied_entry, index_uid);
}
None
}

#[cfg(test)]
Expand All @@ -458,6 +496,22 @@ impl RoutingTable {
}
}

fn delete_entry_if_empty(
occupied_entry: OccupiedEntry<(IndexId, SourceId), RoutingTableEntry>,
index_uid: &IndexUid,
) -> Option<(IndexUid, SourceId)> {
if occupied_entry.get().is_empty() && occupied_entry.get().index_uid == *index_uid {
let table_entry = occupied_entry.remove();
info!(
index_uid=%table_entry.index_uid,
source_id=%table_entry.source_id,
"deleted routing table entry"
);
return Some((table_entry.index_uid, table_entry.source_id));
}
None
}

#[cfg(test)]
mod tests {
use quickwit_proto::ingest::ingester::IngesterServiceClient;
Expand Down Expand Up @@ -951,8 +1005,8 @@ mod tests {
let source_id: SourceId = "test-source".into();

let mut table_entry = RoutingTableEntry::empty(index_uid.clone(), source_id.clone());
table_entry.delete_shards(&index_uid, &[]);
table_entry.delete_shards(&index_uid, &[ShardId::from(1)]);
table_entry.delete_shards_by_id(&index_uid, &[]);
table_entry.delete_shards_by_id(&index_uid, &[ShardId::from(1)]);
assert!(table_entry.local_shards.is_empty());
assert!(table_entry.remote_shards.is_empty());

Expand Down Expand Up @@ -1008,7 +1062,7 @@ mod tests {
],
remote_round_robin_idx: AtomicUsize::default(),
};
table_entry.delete_shards(
table_entry.delete_shards_by_id(
&index_uid,
&[
ShardId::from(1),
Expand Down