Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions quickwit/quickwit-indexing/src/actors/merge_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use tracing::{debug, info, instrument, warn};
use crate::actors::Packager;
use crate::controlled_directory::ControlledDirectory;
use crate::merge_policy::MergeOperationType;
use crate::metrics::INDEXER_METRICS;
use crate::models::{
IndexedSplit, IndexedSplitBatch, IndexingPipelineId, MergeScratch, PublishLock,
ScratchDirectory, SplitAttrs,
Expand Down Expand Up @@ -220,6 +221,7 @@ pub fn combine_partition_ids(splits: &[SplitMetadata]) -> u64 {
}

async fn merge_split_directories(
index_id: &str,
union_index_meta: IndexMeta,
split_directories: Vec<Box<dyn Directory>>,
delete_tasks: Vec<DeleteTask>,
Expand All @@ -233,6 +235,11 @@ async fn merge_split_directories(
Box::new(MmapDirectory::open(output_path)?),
ctx.progress().clone(),
ctx.kill_switch().clone(),
Some(
INDEXER_METRICS
.merge_write_num_bytes_total
.with_label_values(&[index_id]),
),
);
let mut directory_stack: Vec<Box<dyn Directory>> = vec![
output_directory.box_clone(),
Expand Down Expand Up @@ -355,6 +362,7 @@ impl MergeExecutor {
// TODO it would be nice if tantivy could let us run the merge in the current thread.
fail_point!("before-merge-split");
let controlled_directory = merge_split_directories(
&self.pipeline_id.index_id,
union_index_meta,
split_directories,
Vec::new(),
Expand Down Expand Up @@ -418,6 +426,7 @@ impl MergeExecutor {

let (union_index_meta, split_directories) = open_split_directories(&tantivy_dirs)?;
let controlled_directory = merge_split_directories(
&self.pipeline_id.index_id,
union_index_meta,
split_directories,
delete_tasks,
Expand Down
33 changes: 16 additions & 17 deletions quickwit/quickwit-indexing/src/controlled_directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::{fmt, io};

use arc_swap::ArcSwap;
use quickwit_actors::{KillSwitch, Progress, ProtectedZoneGuard};
use quickwit_common::metrics::IntCounter;
use tantivy::directory::error::{DeleteError, OpenReadError, OpenWriteError};
use tantivy::directory::{
AntiCallToken, FileHandle, TerminatingWrite, WatchCallback, WatchHandle, WritePtr,
Expand Down Expand Up @@ -51,6 +52,7 @@ impl ControlledDirectory {
directory: Box<dyn Directory>,
progress: Progress,
kill_switch: KillSwitch,
written_num_bytes_counter: Option<IntCounter>,
) -> ControlledDirectory {
ControlledDirectory {
inner: Inner {
Expand All @@ -59,6 +61,7 @@ impl ControlledDirectory {
kill_switch,
}))),
underlying: directory.into(),
written_num_bytes_counter,
},
}
}
Expand Down Expand Up @@ -105,11 +108,13 @@ impl Controls {
struct Inner {
controls: Arc<ArcSwap<Controls>>,
underlying: Arc<dyn Directory>,
written_num_bytes_counter: Option<IntCounter>,
}

struct ControlledWrite {
controls: Arc<ArcSwap<Controls>>,
underlying_wrt: Box<dyn TerminatingWrite>,
written_num_bytes_counter: Option<IntCounter>,
}

impl ControlledWrite {
Expand All @@ -121,6 +126,9 @@ impl ControlledWrite {
impl io::Write for ControlledWrite {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let _guard = self.check_if_alive()?;
if let Some(written_num_bytes_counter) = self.written_num_bytes_counter.as_ref() {
written_num_bytes_counter.inc_by(buf.len() as u64);
};
self.underlying_wrt.write(buf)
}

Expand All @@ -131,21 +139,6 @@ impl io::Write for ControlledWrite {
let _guard = self.check_if_alive();
self.underlying_wrt.flush()
}

fn write_vectored(&mut self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize> {
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems that we don't need that. write_vectored, write_all and write_fmt will end up calling write

let _guard = self.check_if_alive()?;
self.underlying_wrt.write_vectored(bufs)
}

fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
let _guard = self.check_if_alive()?;
self.underlying_wrt.write_all(buf)
}

fn write_fmt(&mut self, fmt: fmt::Arguments<'_>) -> io::Result<()> {
let _guard = self.check_if_alive()?;
self.underlying_wrt.write_fmt(fmt)
}
}

impl Directory for ControlledDirectory {
Expand Down Expand Up @@ -185,6 +178,7 @@ impl Directory for ControlledDirectory {
let controlled_wrt = ControlledWrite {
controls,
underlying_wrt,
written_num_bytes_counter: self.inner.written_num_bytes_counter.clone(),
};
Ok(BufWriter::with_capacity(
BUFFER_NUM_BYTES,
Expand Down Expand Up @@ -234,8 +228,12 @@ mod tests {
fn test_records_progress_on_write() -> anyhow::Result<()> {
let directory = RamDirectory::default();
let progress = Progress::default();
let controlled_directory =
ControlledDirectory::new(Box::new(directory), progress.clone(), KillSwitch::default());
let controlled_directory = ControlledDirectory::new(
Box::new(directory),
progress.clone(),
KillSwitch::default(),
None,
);
assert!(progress.registered_activity_since_last_call());
assert!(!progress.registered_activity_since_last_call());
let mut wrt = controlled_directory.open_write(Path::new("test"))?;
Expand Down Expand Up @@ -264,6 +262,7 @@ mod tests {
Box::new(directory),
Progress::default(),
kill_switch.clone(),
None,
);
let mut wrt = controlled_directory.open_write(Path::new("test"))?;
// We use a large buffer to force the buf writer to flush at least once.
Expand Down
14 changes: 14 additions & 0 deletions quickwit/quickwit-indexing/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ pub struct IndexerMetrics {
pub parsing_errors_num_bytes_total: IntCounterVec,
pub missing_field_num_bytes_total: IntCounterVec,
pub valid_num_bytes_total: IntCounterVec,
pub merge_write_num_bytes_total: IntCounterVec,
pub index_write_num_bytes_total: IntCounterVec,
pub concurrent_upload_available_permits: IntGauge,
}

Expand Down Expand Up @@ -71,6 +73,18 @@ impl Default for IndexerMetrics {
"quickwit_indexing",
&["index"],
),
merge_write_num_bytes_total: new_counter_vec(
"merge_write_num_bytes_total",
"Sum of bytes written by the merge executor.",
"quickwit_indexing",
&["index"],
),
index_write_num_bytes_total: new_counter_vec(
"index_write_num_bytes_total",
"Sum of bytes written by the indexer.",
"quickwit_indexing",
&["index"],
),
concurrent_upload_available_permits: new_gauge(
"concurrent_upload_available_permits",
"Number of concurrent upload available permits.",
Expand Down
13 changes: 11 additions & 2 deletions quickwit/quickwit-indexing/src/models/indexed_split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use tantivy::IndexBuilder;
use tracing::{instrument, Span};

use crate::controlled_directory::ControlledDirectory;
use crate::metrics::INDEXER_METRICS;
use crate::models::{IndexingPipelineId, PublishLock, ScratchDirectory, SplitAttrs};
use crate::new_split_id;

Expand Down Expand Up @@ -91,8 +92,16 @@ impl IndexedSplitBuilder {
scratch_directory.named_temp_child(split_scratch_directory_prefix)?;
let mmap_directory = MmapDirectory::open(split_scratch_directory.path())?;
let box_mmap_directory = Box::new(mmap_directory);
let controlled_directory =
ControlledDirectory::new(box_mmap_directory, progress, kill_switch);
let controlled_directory = ControlledDirectory::new(
box_mmap_directory,
progress,
kill_switch,
Some(
INDEXER_METRICS
.index_write_num_bytes_total
.with_label_values(&[&pipeline_id.index_id]),
),
);
let index_writer =
index_builder.single_segment_index_writer(controlled_directory.clone(), 10_000_000)?;
Ok(Self {
Expand Down