diff --git a/quickwit/quickwit-indexing/src/actors/merge_executor.rs b/quickwit/quickwit-indexing/src/actors/merge_executor.rs index 46e72b3f99b..e92a0e1a17b 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_executor.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_executor.rs @@ -44,6 +44,7 @@ use tracing::{debug, info, instrument, warn}; use crate::actors::Packager; use crate::controlled_directory::ControlledDirectory; use crate::merge_policy::MergeOperationType; +use crate::metrics::INDEXER_METRICS; use crate::models::{ IndexedSplit, IndexedSplitBatch, IndexingPipelineId, MergeScratch, PublishLock, ScratchDirectory, SplitAttrs, @@ -220,6 +221,7 @@ pub fn combine_partition_ids(splits: &[SplitMetadata]) -> u64 { } async fn merge_split_directories( + index_id: &str, union_index_meta: IndexMeta, split_directories: Vec>, delete_tasks: Vec, @@ -233,6 +235,11 @@ async fn merge_split_directories( Box::new(MmapDirectory::open(output_path)?), ctx.progress().clone(), ctx.kill_switch().clone(), + Some( + INDEXER_METRICS + .merge_write_num_bytes_total + .with_label_values(&[index_id]), + ), ); let mut directory_stack: Vec> = vec![ output_directory.box_clone(), @@ -355,6 +362,7 @@ impl MergeExecutor { // TODO it would be nice if tantivy could let us run the merge in the current thread. fail_point!("before-merge-split"); let controlled_directory = merge_split_directories( + &self.pipeline_id.index_id, union_index_meta, split_directories, Vec::new(), @@ -418,6 +426,7 @@ impl MergeExecutor { let (union_index_meta, split_directories) = open_split_directories(&tantivy_dirs)?; let controlled_directory = merge_split_directories( + &self.pipeline_id.index_id, union_index_meta, split_directories, delete_tasks, diff --git a/quickwit/quickwit-indexing/src/controlled_directory.rs b/quickwit/quickwit-indexing/src/controlled_directory.rs index 828f5379b0e..2664c5e2cf0 100644 --- a/quickwit/quickwit-indexing/src/controlled_directory.rs +++ b/quickwit/quickwit-indexing/src/controlled_directory.rs @@ -24,6 +24,7 @@ use std::{fmt, io}; use arc_swap::ArcSwap; use quickwit_actors::{KillSwitch, Progress, ProtectedZoneGuard}; +use quickwit_common::metrics::IntCounter; use tantivy::directory::error::{DeleteError, OpenReadError, OpenWriteError}; use tantivy::directory::{ AntiCallToken, FileHandle, TerminatingWrite, WatchCallback, WatchHandle, WritePtr, @@ -51,6 +52,7 @@ impl ControlledDirectory { directory: Box, progress: Progress, kill_switch: KillSwitch, + written_num_bytes_counter: Option, ) -> ControlledDirectory { ControlledDirectory { inner: Inner { @@ -59,6 +61,7 @@ impl ControlledDirectory { kill_switch, }))), underlying: directory.into(), + written_num_bytes_counter, }, } } @@ -105,11 +108,13 @@ impl Controls { struct Inner { controls: Arc>, underlying: Arc, + written_num_bytes_counter: Option, } struct ControlledWrite { controls: Arc>, underlying_wrt: Box, + written_num_bytes_counter: Option, } impl ControlledWrite { @@ -121,6 +126,9 @@ impl ControlledWrite { impl io::Write for ControlledWrite { fn write(&mut self, buf: &[u8]) -> io::Result { let _guard = self.check_if_alive()?; + if let Some(written_num_bytes_counter) = self.written_num_bytes_counter.as_ref() { + written_num_bytes_counter.inc_by(buf.len() as u64); + }; self.underlying_wrt.write(buf) } @@ -131,21 +139,6 @@ impl io::Write for ControlledWrite { let _guard = self.check_if_alive(); self.underlying_wrt.flush() } - - fn write_vectored(&mut self, bufs: &[io::IoSlice<'_>]) -> io::Result { - let _guard = self.check_if_alive()?; - self.underlying_wrt.write_vectored(bufs) - } - - fn write_all(&mut self, buf: &[u8]) -> io::Result<()> { - let _guard = self.check_if_alive()?; - self.underlying_wrt.write_all(buf) - } - - fn write_fmt(&mut self, fmt: fmt::Arguments<'_>) -> io::Result<()> { - let _guard = self.check_if_alive()?; - self.underlying_wrt.write_fmt(fmt) - } } impl Directory for ControlledDirectory { @@ -185,6 +178,7 @@ impl Directory for ControlledDirectory { let controlled_wrt = ControlledWrite { controls, underlying_wrt, + written_num_bytes_counter: self.inner.written_num_bytes_counter.clone(), }; Ok(BufWriter::with_capacity( BUFFER_NUM_BYTES, @@ -234,8 +228,12 @@ mod tests { fn test_records_progress_on_write() -> anyhow::Result<()> { let directory = RamDirectory::default(); let progress = Progress::default(); - let controlled_directory = - ControlledDirectory::new(Box::new(directory), progress.clone(), KillSwitch::default()); + let controlled_directory = ControlledDirectory::new( + Box::new(directory), + progress.clone(), + KillSwitch::default(), + None, + ); assert!(progress.registered_activity_since_last_call()); assert!(!progress.registered_activity_since_last_call()); let mut wrt = controlled_directory.open_write(Path::new("test"))?; @@ -264,6 +262,7 @@ mod tests { Box::new(directory), Progress::default(), kill_switch.clone(), + None, ); let mut wrt = controlled_directory.open_write(Path::new("test"))?; // We use a large buffer to force the buf writer to flush at least once. diff --git a/quickwit/quickwit-indexing/src/metrics.rs b/quickwit/quickwit-indexing/src/metrics.rs index dbab6e9fca5..1a02af54fc6 100644 --- a/quickwit/quickwit-indexing/src/metrics.rs +++ b/quickwit/quickwit-indexing/src/metrics.rs @@ -27,6 +27,8 @@ pub struct IndexerMetrics { pub parsing_errors_num_bytes_total: IntCounterVec, pub missing_field_num_bytes_total: IntCounterVec, pub valid_num_bytes_total: IntCounterVec, + pub merge_write_num_bytes_total: IntCounterVec, + pub index_write_num_bytes_total: IntCounterVec, pub concurrent_upload_available_permits: IntGauge, } @@ -71,6 +73,18 @@ impl Default for IndexerMetrics { "quickwit_indexing", &["index"], ), + merge_write_num_bytes_total: new_counter_vec( + "merge_write_num_bytes_total", + "Sum of bytes written by the merge executor.", + "quickwit_indexing", + &["index"], + ), + index_write_num_bytes_total: new_counter_vec( + "index_write_num_bytes_total", + "Sum of bytes written by the indexer.", + "quickwit_indexing", + &["index"], + ), concurrent_upload_available_permits: new_gauge( "concurrent_upload_available_permits", "Number of concurrent upload available permits.", diff --git a/quickwit/quickwit-indexing/src/models/indexed_split.rs b/quickwit/quickwit-indexing/src/models/indexed_split.rs index 00698835752..811918c7be2 100644 --- a/quickwit/quickwit-indexing/src/models/indexed_split.rs +++ b/quickwit/quickwit-indexing/src/models/indexed_split.rs @@ -27,6 +27,7 @@ use tantivy::IndexBuilder; use tracing::{instrument, Span}; use crate::controlled_directory::ControlledDirectory; +use crate::metrics::INDEXER_METRICS; use crate::models::{IndexingPipelineId, PublishLock, ScratchDirectory, SplitAttrs}; use crate::new_split_id; @@ -91,8 +92,16 @@ impl IndexedSplitBuilder { scratch_directory.named_temp_child(split_scratch_directory_prefix)?; let mmap_directory = MmapDirectory::open(split_scratch_directory.path())?; let box_mmap_directory = Box::new(mmap_directory); - let controlled_directory = - ControlledDirectory::new(box_mmap_directory, progress, kill_switch); + let controlled_directory = ControlledDirectory::new( + box_mmap_directory, + progress, + kill_switch, + Some( + INDEXER_METRICS + .index_write_num_bytes_total + .with_label_values(&[&pipeline_id.index_id]), + ), + ); let index_writer = index_builder.single_segment_index_writer(controlled_directory.clone(), 10_000_000)?; Ok(Self {