diff --git a/quickwit/quickwit-config/src/index_config.rs b/quickwit/quickwit-config/src/index_config.rs index 4044402f0a3..361d3756dd1 100644 --- a/quickwit/quickwit-config/src/index_config.rs +++ b/quickwit/quickwit-config/src/index_config.rs @@ -156,7 +156,12 @@ pub struct IndexingSettings { /// `split_num_docs_target` are considered mature and never merged. #[serde(default = "IndexingSettings::default_split_num_docs_target")] pub split_num_docs_target: usize, - + /// Sets the maximum write IO throughput, in bytes per secs. + /// On hardware where IO is limited, this parameter can help limiting + /// the impact of merges on indexing. + #[serde(default)] + #[serde(skip_serializing_if = "Option::is_none")] + pub max_merge_write_throughput: Option, #[serde(default)] pub merge_policy: MergePolicyConfig, #[serde(default)] @@ -218,6 +223,7 @@ impl From for IndexingSettings { split_num_docs_target: settings.split_num_docs_target, merge_policy, resources: settings.resources, + max_merge_write_throughput: None, } } } @@ -309,6 +315,7 @@ impl Default for IndexingSettings { split_num_docs_target: Self::default_split_num_docs_target(), merge_policy: MergePolicyConfig::default(), resources: IndexingResources::default(), + max_merge_write_throughput: None, } } } diff --git a/quickwit/quickwit-indexing/src/actors/index_serializer.rs b/quickwit/quickwit-indexing/src/actors/index_serializer.rs index 8c7a70765fd..b54fb9d740c 100644 --- a/quickwit/quickwit-indexing/src/actors/index_serializer.rs +++ b/quickwit/quickwit-indexing/src/actors/index_serializer.rs @@ -73,14 +73,21 @@ impl Handler for IndexSerializer { batch_builder: IndexedSplitBatchBuilder, ctx: &ActorContext, ) -> Result<(), ActorExitStatus> { - let splits: Vec = { - let _protect = ctx.protect_zone(); - batch_builder - .splits - .into_iter() - .map(|split_builder| split_builder.finalize()) - .collect::>()? - }; + let mut splits: Vec = Vec::with_capacity(batch_builder.splits.len()); + for split_builder in batch_builder.splits { + // TODO Consider & test removing this protect guard. + // + // In theory the controlled directory should be sufficient. + let _protect_guard = ctx.protect_zone(); + if let Some(controlled_directory) = &split_builder.controlled_directory_opt { + controlled_directory.set_progress_and_kill_switch( + ctx.progress().clone(), + ctx.kill_switch().clone(), + ); + } + let split = split_builder.finalize()?; + splits.push(split); + } let indexed_split_batch = IndexedSplitBatch { batch_parent_span: batch_builder.batch_parent_span, splits, diff --git a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs index b5372ca4537..1db6f378d0e 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs @@ -276,9 +276,24 @@ impl IndexingPipeline { .set_kill_switch(self.kill_switch.clone()) .spawn(merge_packager); + let io_throttler = if let Some(max_num_bytes_per_secs) = + self.params.indexing_settings.max_merge_write_throughput + { + crate::throttle::create_throttle( + max_num_bytes_per_secs.get_bytes(), + Duration::from_millis(500), + crate::metrics::INDEXER_METRICS + .merge_write_num_bytes_total + .clone(), + ) + } else { + crate::throttle::no_throttling() + }; + let merge_executor = MergeExecutor::new( self.params.pipeline_id.clone(), self.params.metastore.clone(), + io_throttler, merge_packager_mailbox, ); let (merge_executor_mailbox, merge_executor_handler) = ctx diff --git a/quickwit/quickwit-indexing/src/actors/merge_executor.rs b/quickwit/quickwit-indexing/src/actors/merge_executor.rs index 46e72b3f99b..768f47fbe08 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_executor.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_executor.rs @@ -48,11 +48,13 @@ use crate::models::{ IndexedSplit, IndexedSplitBatch, IndexingPipelineId, MergeScratch, PublishLock, ScratchDirectory, SplitAttrs, }; +use crate::throttle::Throttle; #[derive(Clone)] pub struct MergeExecutor { pipeline_id: IndexingPipelineId, metastore: Arc, + io_throttle: Arc, merge_packager_mailbox: Mailbox, } @@ -219,78 +221,6 @@ pub fn combine_partition_ids(splits: &[SplitMetadata]) -> u64 { combine_partition_ids_aux(splits.iter().map(|split| split.partition_id)) } -async fn merge_split_directories( - union_index_meta: IndexMeta, - split_directories: Vec>, - delete_tasks: Vec, - doc_mapper_opt: Option>, - output_path: &Path, - ctx: &ActorContext, -) -> anyhow::Result { - let shadowing_meta_json_directory = create_shadowing_meta_json_directory(union_index_meta)?; - // This directory is here to receive the merged split, as well as the final meta.json file. - let output_directory = ControlledDirectory::new( - Box::new(MmapDirectory::open(output_path)?), - ctx.progress().clone(), - ctx.kill_switch().clone(), - ); - let mut directory_stack: Vec> = vec![ - output_directory.box_clone(), - Box::new(shadowing_meta_json_directory), - ]; - directory_stack.extend(split_directories.into_iter()); - let union_directory = UnionDirectory::union_of(directory_stack); - let union_index = open_index(union_directory)?; - - ctx.record_progress(); - let _protect_guard = ctx.protect_zone(); - - let mut index_writer = union_index.writer_with_num_threads(1, 3_000_000)?; - let num_delete_tasks = delete_tasks.len(); - if num_delete_tasks > 0 { - let doc_mapper = doc_mapper_opt - .ok_or_else(|| anyhow!("Doc mapper must be present if there are delete tasks."))?; - for delete_task in delete_tasks { - let delete_query = delete_task - .delete_query - .expect("A delete task must have a delete query."); - let search_request = SearchRequest { - index_id: delete_query.index_id, - query: delete_query.query, - start_timestamp: delete_query.start_timestamp, - end_timestamp: delete_query.end_timestamp, - search_fields: delete_query.search_fields, - ..Default::default() - }; - debug!( - "Delete all documents matched by query `{:?}`", - search_request - ); - let query = doc_mapper.query(union_index.schema(), &search_request)?; - index_writer.delete_query(query)?; - } - debug!("commit-delete-operations"); - index_writer.commit()?; - } - - let segment_ids: Vec = union_index - .searchable_segment_metas()? - .into_iter() - .map(|segment_meta| segment_meta.id()) - .collect(); - - // A merge is useless if there is no delete and only one segment. - if num_delete_tasks == 0 && segment_ids.len() <= 1 { - return Ok(output_directory); - } - - debug!(segment_ids=?segment_ids,"merging-segments"); - // TODO it would be nice if tantivy could let us run the merge in the current thread. - index_writer.merge(&segment_ids).wait()?; - - Ok(output_directory) -} - pub fn merge_split_attrs( merge_split_id: String, pipeline_id: &IndexingPipelineId, @@ -334,11 +264,13 @@ impl MergeExecutor { pub fn new( pipeline_id: IndexingPipelineId, metastore: Arc, + io_throttle: Arc, merge_packager_mailbox: Mailbox, ) -> Self { MergeExecutor { pipeline_id, metastore, + io_throttle, merge_packager_mailbox, } } @@ -354,15 +286,16 @@ impl MergeExecutor { let (union_index_meta, split_directories) = open_split_directories(&tantivy_dirs)?; // TODO it would be nice if tantivy could let us run the merge in the current thread. fail_point!("before-merge-split"); - let controlled_directory = merge_split_directories( - union_index_meta, - split_directories, - Vec::new(), - None, - merge_scratch_directory.path(), - ctx, - ) - .await?; + let controlled_directory = self + .merge_split_directories( + union_index_meta, + split_directories, + Vec::new(), + None, + merge_scratch_directory.path(), + ctx, + ) + .await?; fail_point!("after-merge-split"); // This will have the side effect of deleting the directory containing the downloaded @@ -417,15 +350,16 @@ impl MergeExecutor { ); let (union_index_meta, split_directories) = open_split_directories(&tantivy_dirs)?; - let controlled_directory = merge_split_directories( - union_index_meta, - split_directories, - delete_tasks, - Some(doc_mapper.clone()), - merge_scratch_directory.path(), - ctx, - ) - .await?; + let controlled_directory = self + .merge_split_directories( + union_index_meta, + split_directories, + delete_tasks, + Some(doc_mapper.clone()), + merge_scratch_directory.path(), + ctx, + ) + .await?; // This will have the side effect of deleting the directory containing the downloaded split. let mut merged_index = Index::open(controlled_directory.clone())?; @@ -492,6 +426,80 @@ impl MergeExecutor { }; Ok(Some(indexed_split)) } + + async fn merge_split_directories( + &self, + union_index_meta: IndexMeta, + split_directories: Vec>, + delete_tasks: Vec, + doc_mapper_opt: Option>, + output_path: &Path, + ctx: &ActorContext, + ) -> anyhow::Result { + let shadowing_meta_json_directory = create_shadowing_meta_json_directory(union_index_meta)?; + // This directory is here to receive the merged split, as well as the final meta.json file. + let output_directory = ControlledDirectory::new( + Box::new(MmapDirectory::open(output_path)?), + ctx.progress().clone(), + ctx.kill_switch().clone(), + self.io_throttle.clone(), + ); + let mut directory_stack: Vec> = vec![ + output_directory.box_clone(), + Box::new(shadowing_meta_json_directory), + ]; + directory_stack.extend(split_directories.into_iter()); + let union_directory = UnionDirectory::union_of(directory_stack); + let union_index = open_index(union_directory)?; + + ctx.record_progress(); + let _protect_guard = ctx.protect_zone(); + + let mut index_writer = union_index.writer_with_num_threads(1, 3_000_000)?; + let num_delete_tasks = delete_tasks.len(); + if num_delete_tasks > 0 { + let doc_mapper = doc_mapper_opt + .ok_or_else(|| anyhow!("Doc mapper must be present if there are delete tasks."))?; + for delete_task in delete_tasks { + let delete_query = delete_task + .delete_query + .expect("A delete task must have a delete query."); + let search_request = SearchRequest { + index_id: delete_query.index_id, + query: delete_query.query, + start_timestamp: delete_query.start_timestamp, + end_timestamp: delete_query.end_timestamp, + search_fields: delete_query.search_fields, + ..Default::default() + }; + debug!( + "Delete all documents matched by query `{:?}`", + search_request + ); + let query = doc_mapper.query(union_index.schema(), &search_request)?; + index_writer.delete_query(query)?; + } + debug!("commit-delete-operations"); + index_writer.commit()?; + } + + let segment_ids: Vec = union_index + .searchable_segment_metas()? + .into_iter() + .map(|segment_meta| segment_meta.id()) + .collect(); + + // A merge is useless if there is no delete and only one segment. + if num_delete_tasks == 0 && segment_ids.len() <= 1 { + return Ok(output_directory); + } + + debug!(segment_ids=?segment_ids,"merging-segments"); + // TODO it would be nice if tantivy could let us run the merge in the current thread. + index_writer.merge(&segment_ids).wait()?; + + Ok(output_directory) + } } fn open_index>>(directory: T) -> tantivy::Result { @@ -510,6 +518,7 @@ mod tests { use super::*; use crate::merge_policy::MergeOperation; use crate::models::{IndexingPipelineId, ScratchDirectory}; + use crate::throttle::no_throttling; use crate::{get_tantivy_directory_from_split_bundle, new_split_id, TestSandbox}; #[tokio::test] @@ -574,7 +583,12 @@ mod tests { downloaded_splits_directory, }; let (merge_packager_mailbox, merge_packager_inbox) = create_test_mailbox(); - let merge_executor = MergeExecutor::new(pipeline_id, metastore, merge_packager_mailbox); + let merge_executor = MergeExecutor::new( + pipeline_id, + metastore, + no_throttling(), + merge_packager_mailbox, + ); let universe = Universe::new(); let (merge_executor_mailbox, merge_executor_handle) = universe.spawn_builder().spawn(merge_executor); @@ -709,8 +723,12 @@ mod tests { downloaded_splits_directory, }; let (merge_packager_mailbox, merge_packager_inbox) = create_test_mailbox(); - let delete_task_executor = - MergeExecutor::new(pipeline_id, metastore, merge_packager_mailbox); + let delete_task_executor = MergeExecutor::new( + pipeline_id, + metastore, + no_throttling(), + merge_packager_mailbox, + ); let universe = Universe::new(); let (delete_task_executor_mailbox, delete_task_executor_handle) = universe.spawn_builder().spawn(delete_task_executor); diff --git a/quickwit/quickwit-indexing/src/actors/packager.rs b/quickwit/quickwit-indexing/src/actors/packager.rs index 8402c615bc8..b1d40f1b2d4 100644 --- a/quickwit/quickwit-indexing/src/actors/packager.rs +++ b/quickwit/quickwit-indexing/src/actors/packager.rs @@ -134,14 +134,6 @@ impl Handler for Packager { split_ids=?split_ids, "start-packaging-splits" ); - for split in &batch.splits { - if let Some(controlled_directory) = &split.controlled_directory_opt { - controlled_directory.set_progress_and_kill_switch( - ctx.progress().clone(), - ctx.kill_switch().clone(), - ); - } - } fail_point!("packager:before"); let mut packaged_splits = Vec::new(); for split in batch.splits { diff --git a/quickwit/quickwit-indexing/src/controlled_directory.rs b/quickwit/quickwit-indexing/src/controlled_directory.rs index 828f5379b0e..7fcbe5436d8 100644 --- a/quickwit/quickwit-indexing/src/controlled_directory.rs +++ b/quickwit/quickwit-indexing/src/controlled_directory.rs @@ -30,6 +30,8 @@ use tantivy::directory::{ }; use tantivy::Directory; +use crate::throttle::Throttle; + /// Buffer capacity. /// /// This is the current default for the BufWriter, but considering this constant @@ -51,6 +53,7 @@ impl ControlledDirectory { directory: Box, progress: Progress, kill_switch: KillSwitch, + throttle: Arc, ) -> ControlledDirectory { ControlledDirectory { inner: Inner { @@ -59,6 +62,7 @@ impl ControlledDirectory { kill_switch, }))), underlying: directory.into(), + throttle, }, } } @@ -105,11 +109,13 @@ impl Controls { struct Inner { controls: Arc>, underlying: Arc, + throttle: Arc, } struct ControlledWrite { controls: Arc>, underlying_wrt: Box, + throttle: Arc, } impl ControlledWrite { @@ -118,34 +124,28 @@ impl ControlledWrite { } } +const MAX_NUM_BYTES_WRITTEN_AT_ONCE: usize = 1 << 20; // 1MB + impl io::Write for ControlledWrite { fn write(&mut self, buf: &[u8]) -> io::Result { + let buf = if buf.len() > MAX_NUM_BYTES_WRITTEN_AT_ONCE { + &buf[..MAX_NUM_BYTES_WRITTEN_AT_ONCE] + } else { + buf + }; let _guard = self.check_if_alive()?; - self.underlying_wrt.write(buf) + let written_num_bytes = self.underlying_wrt.write(buf)?; + self.throttle.consume(written_num_bytes as u64); + Ok(written_num_bytes) } fn flush(&mut self) -> io::Result<()> { // We voluntarily avoid to check the kill switch on flush. - // This is because the RAMDirectory currently panics if flush - // is not called before Drop. + // This is because the `RAMDirectory` currently panics if flush + // is not called before `Drop`. let _guard = self.check_if_alive(); self.underlying_wrt.flush() } - - fn write_vectored(&mut self, bufs: &[io::IoSlice<'_>]) -> io::Result { - let _guard = self.check_if_alive()?; - self.underlying_wrt.write_vectored(bufs) - } - - fn write_all(&mut self, buf: &[u8]) -> io::Result<()> { - let _guard = self.check_if_alive()?; - self.underlying_wrt.write_all(buf) - } - - fn write_fmt(&mut self, fmt: fmt::Arguments<'_>) -> io::Result<()> { - let _guard = self.check_if_alive()?; - self.underlying_wrt.write_fmt(fmt) - } } impl Directory for ControlledDirectory { @@ -185,6 +185,7 @@ impl Directory for ControlledDirectory { let controlled_wrt = ControlledWrite { controls, underlying_wrt, + throttle: self.inner.throttle.clone(), }; Ok(BufWriter::with_capacity( BUFFER_NUM_BYTES, @@ -229,13 +230,18 @@ mod tests { use tantivy::directory::RamDirectory; use super::*; + use crate::throttle::no_throttling; #[test] fn test_records_progress_on_write() -> anyhow::Result<()> { let directory = RamDirectory::default(); let progress = Progress::default(); - let controlled_directory = - ControlledDirectory::new(Box::new(directory), progress.clone(), KillSwitch::default()); + let controlled_directory = ControlledDirectory::new( + Box::new(directory), + progress.clone(), + KillSwitch::default(), + no_throttling(), + ); assert!(progress.registered_activity_since_last_call()); assert!(!progress.registered_activity_since_last_call()); let mut wrt = controlled_directory.open_write(Path::new("test"))?; @@ -264,6 +270,7 @@ mod tests { Box::new(directory), Progress::default(), kill_switch.clone(), + no_throttling(), ); let mut wrt = controlled_directory.open_write(Path::new("test"))?; // We use a large buffer to force the buf writer to flush at least once. diff --git a/quickwit/quickwit-indexing/src/lib.rs b/quickwit/quickwit-indexing/src/lib.rs index 1b7b61c0584..8362b225f77 100644 --- a/quickwit/quickwit-indexing/src/lib.rs +++ b/quickwit/quickwit-indexing/src/lib.rs @@ -44,6 +44,7 @@ pub mod source; mod split_store; #[cfg(any(test, feature = "testsuite"))] mod test_utils; +pub mod throttle; #[cfg(any(test, feature = "testsuite"))] pub use test_utils::{mock_split, mock_split_meta, TestSandbox}; diff --git a/quickwit/quickwit-indexing/src/metrics.rs b/quickwit/quickwit-indexing/src/metrics.rs index ad91494137d..8fda08a1323 100644 --- a/quickwit/quickwit-indexing/src/metrics.rs +++ b/quickwit/quickwit-indexing/src/metrics.rs @@ -28,6 +28,7 @@ pub struct IndexerMetrics { pub missing_field_num_bytes_total: IntCounter, pub valid_num_bytes_total: IntCounter, pub concurrent_upload_available_permits: IntGauge, + pub merge_write_num_bytes_total: IntCounter, } impl Default for IndexerMetrics { @@ -68,6 +69,11 @@ impl Default for IndexerMetrics { "Number of concurrent upload available permits.", "quickwit_indexing", ), + merge_write_num_bytes_total: new_counter( + "merge_write_num_bytes_total", + "Number of bytes written by the merge executor.", + "quickwit_indexing", + ), } } } diff --git a/quickwit/quickwit-indexing/src/models/indexed_split.rs b/quickwit/quickwit-indexing/src/models/indexed_split.rs index 00698835752..94842becabc 100644 --- a/quickwit/quickwit-indexing/src/models/indexed_split.rs +++ b/quickwit/quickwit-indexing/src/models/indexed_split.rs @@ -29,6 +29,7 @@ use tracing::{instrument, Span}; use crate::controlled_directory::ControlledDirectory; use crate::models::{IndexingPipelineId, PublishLock, ScratchDirectory, SplitAttrs}; use crate::new_split_id; +use crate::throttle::no_throttling; pub struct IndexedSplitBuilder { pub split_attrs: SplitAttrs, @@ -92,7 +93,7 @@ impl IndexedSplitBuilder { let mmap_directory = MmapDirectory::open(split_scratch_directory.path())?; let box_mmap_directory = Box::new(mmap_directory); let controlled_directory = - ControlledDirectory::new(box_mmap_directory, progress, kill_switch); + ControlledDirectory::new(box_mmap_directory, progress, kill_switch, no_throttling()); let index_writer = index_builder.single_segment_index_writer(controlled_directory.clone(), 10_000_000)?; Ok(Self { diff --git a/quickwit/quickwit-indexing/src/throttle.rs b/quickwit/quickwit-indexing/src/throttle.rs new file mode 100644 index 00000000000..095a7972f30 --- /dev/null +++ b/quickwit/quickwit-indexing/src/throttle.rs @@ -0,0 +1,242 @@ +// Copyright (C) 2022 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::sync::{Arc, Weak}; +use std::time::{Duration, Instant}; + +use quickwit_common::metrics::IntCounter; +pub struct Throttle { + num_units_consumed: AtomicU64, + // Should wait is a atomic bool that we read with a relaxed ordering. + // On our happy path (no cooldown required), we don't pick any lock nor have any memory + // barrier. + should_wait: AtomicBool, + + // We use RwLock as a synchronization primitive. + rw_lock: tokio::sync::RwLock<()>, +} + +impl Throttle { + /// Records that we are about to consume `num_units`. + /// A throttling task may have identified that some throttling is needed. + /// + /// In this case, this function will be block for as long is required by the throttling + /// logic. + /// + /// When no throttling happens -our happy path-, this call is as light as can be. + pub fn consume(&self, num_units: u64) { + let should_wait = self.should_wait.load(Ordering::Relaxed); + if should_wait { + let _ = self.rw_lock.blocking_read(); + } + self.num_units_consumed + .fetch_add(num_units, Ordering::Relaxed); + } + + /// Async version of consume (It is not used at the moment.) + #[allow(dead_code)] + pub async fn consume_async(&self, num_units: u64) { + let should_wait = self.should_wait.load(Ordering::Relaxed); + if should_wait { + let _ = self.rw_lock.read().await; + } + self.num_units_consumed + .fetch_add(num_units, Ordering::Relaxed); + } +} + +fn apply_cooldown(throttle: &Throttle, cool_down_duration: Duration) { + let _wlock = throttle.rw_lock.blocking_write(); + throttle.should_wait.store(true, Ordering::Relaxed); + std::thread::sleep(cool_down_duration); + throttle.should_wait.store(false, Ordering::Relaxed); +} + +fn throttling_control_loop( + throttle_weak: Weak, + units_per_secs: u64, + throttling_reactivity: Duration, + counter: IntCounter, +) { + let mut window_start = Instant::now(); + + let mut credit: i64 = 0i64; + let credit_cap: i64 = + throttling_reactivity.as_micros() as i64 * units_per_secs as i64 / 1_000_000; + + while let Some(throttle) = throttle_weak.upgrade() { + let elapsed = window_start.elapsed(); + window_start = Instant::now(); + + let units_consumed = throttle.num_units_consumed.swap(0, Ordering::Relaxed); + counter.inc_by(units_consumed); + + credit += (elapsed.as_micros() as u64 * units_per_secs) as i64 / 1_000_000; + credit -= units_consumed as i64; + credit = credit.min(credit_cap); + + if credit < 0 { + // Cooldown. Everyone stops long enough to make the credit + let cool_down_micros = (-credit as u64) * 1_000_000 / units_per_secs; + let cool_down_duration = + Duration::from_micros(cool_down_micros as u64) + throttling_reactivity; + apply_cooldown(&throttle, cool_down_duration); + } else { + // Let everyone unhinged for 1 full second. + std::thread::sleep(throttling_reactivity); + } + } +} + +/// Creates a new throttle. +/// +/// A throttle can be cloned to share a given "budget" between several objects/threads. +/// +/// * units_per_secs: Number of units per second allowed. This is our maximum target throughput. +/// * throttling_reactivity (1s is a good default value): +/// - Measures at which frequency the throttler will stop resume tasks. +/// - What is the "window" on which throttling is enforced. +pub fn create_throttle( + units_per_sec: u64, + throttling_reactivity: Duration, + counter: IntCounter, +) -> Arc { + assert!(units_per_sec > 0); + let throttle = Arc::new(Throttle { + num_units_consumed: Default::default(), + should_wait: Default::default(), + rw_lock: Default::default(), + }); + let throttle_weak = Arc::downgrade(&throttle); + std::thread::spawn(move || { + throttling_control_loop(throttle_weak, units_per_sec, throttling_reactivity, counter) + }); + throttle +} + +/// Returns a `throttle` object that will actually never enforce any throttling. +pub fn no_throttling() -> Arc { + // In case you were wondering, apparently fetch_add guarantees wrapping behavior for fetch_add. + Arc::new(Throttle { + num_units_consumed: Default::default(), + should_wait: Default::default(), + rw_lock: Default::default(), + }) +} + +#[cfg(test)] +mod tests { + use std::time::{Duration, Instant}; + + use quickwit_common::metrics::IntCounter; + + use super::create_throttle; + + fn test_counter() -> IntCounter { + IntCounter::new("test_counter", "Generates a fake counter just for tests.").unwrap() + } + + #[tokio::test] + async fn test_sync_throttling() { + let throttler = create_throttle(1_000, Duration::from_millis(10), test_counter()); + // We try to consume 400 at a pace of 4_000 / secs. + // Because we are throttled, it will actually take around 400ms. + let join_handle = tokio::task::spawn_blocking(move || { + let start = Instant::now(); + for _ in 0..100 { + std::thread::sleep(Duration::from_millis(1)); + throttler.consume(4); + } + start.elapsed() + }); + let elapsed = join_handle.await.unwrap(); + assert!(elapsed.as_millis() > 350); + assert!(elapsed.as_millis() < 450); + } + + #[tokio::test] + async fn test_async_throttling() { + let throttler = create_throttle(1_000, Duration::from_millis(20), test_counter()); + let now = Instant::now(); + // We try to consume 400 at a pace of 4_000 / secs. + // Because we are throttled, it will actually take around 400ms. + for _ in 0..100 { + tokio::time::sleep(Duration::from_millis(1)).await; + throttler.consume_async(4).await; + } + let elapsed = now.elapsed(); + assert!(elapsed.as_millis() > 350); + assert!(elapsed.as_millis() < 450) + } + + #[tokio::test] + async fn test_credit_do_not_accumulate_forever() { + let throttler = create_throttle(1_000, Duration::from_millis(10), test_counter()); + let now = Instant::now(); + // We try to consume intermittently. + // + // - 100 at a pace of 4000/secs + // - no consumption for 100ms + // ... repeated 5 times. + // + // The point of this test is too see if extended (= much larger than `reactivity`) + // lack of call to consume accumulate credits. + // + // We are hoping here to see throttling happen, and hope the result will take around + // (100 + 100) * 5 = 1sec + for _ in 0..5 { + tokio::time::sleep(Duration::from_millis(100)).await; + for _ in 0..25 { + tokio::time::sleep(Duration::from_millis(1)).await; + throttler.consume_async(4).await; + } + } + let elapsed = now.elapsed(); + assert!(elapsed.as_millis() > 900); + assert!(elapsed.as_millis() < 1100) + } + + #[tokio::test] + async fn test_credit_do_accumulate() { + let throttler = create_throttle(100_000, Duration::from_millis(100), test_counter()); + let now = Instant::now(); + // We try to consume intermittently. + // + // - 100 at a pace of 4000/secs + // - no consumption for 100ms + // ... repeated 5 times. + // + // The point of this test is too see if extended (= much larger than `reactivity`) + // lack of call to consume accumulate credits. + // + // We are hoping here to see throttling happen, and hope the result will take around + // [100ms + 25ms ] * 5 = 625ms + for _ in 0..5 { + tokio::time::sleep(Duration::from_millis(100)).await; + for _ in 0..5 { + tokio::time::sleep(Duration::from_millis(5)).await; + throttler.consume_async(20).await; + } + } + let elapsed = now.elapsed(); + assert!(elapsed.as_millis() > 550); + assert!(elapsed.as_millis() < 720) + } +} diff --git a/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs b/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs index 8587c6bdadd..4885b74e4d2 100644 --- a/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs +++ b/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs @@ -32,7 +32,7 @@ use quickwit_indexing::actors::{ }; use quickwit_indexing::merge_policy::merge_policy_from_settings; use quickwit_indexing::models::{IndexingDirectory, IndexingPipelineId}; -use quickwit_indexing::{IndexingSplitStore, PublisherType, SplitsUpdateMailbox}; +use quickwit_indexing::{throttle, IndexingSplitStore, PublisherType, SplitsUpdateMailbox}; use quickwit_metastore::Metastore; use quickwit_search::SearchClientPool; use quickwit_storage::Storage; @@ -164,8 +164,12 @@ impl DeleteTaskPipeline { pipeline_ord: 0, source_id: "unknown".to_string(), }; - let delete_executor = - MergeExecutor::new(index_pipeline_id, self.metastore.clone(), packager_mailbox); + let delete_executor = MergeExecutor::new( + index_pipeline_id, + self.metastore.clone(), + throttle::no_throttling(), + packager_mailbox, + ); let (delete_executor_mailbox, task_executor_supervisor_handler) = ctx .spawn_actor() .set_kill_switch(KillSwitch::default())