Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion quickwit/quickwit-config/src/index_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,12 @@ pub struct IndexingSettings {
/// `split_num_docs_target` are considered mature and never merged.
#[serde(default = "IndexingSettings::default_split_num_docs_target")]
pub split_num_docs_target: usize,

/// Sets the maximum write IO throughput, in bytes per secs.
/// On hardware where IO is limited, this parameter can help limiting
/// the impact of merges on indexing.
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub max_merge_write_throughput: Option<Byte>,
#[serde(default)]
pub merge_policy: MergePolicyConfig,
#[serde(default)]
Expand Down Expand Up @@ -218,6 +223,7 @@ impl From<IndexingSettingsLegacy> for IndexingSettings {
split_num_docs_target: settings.split_num_docs_target,
merge_policy,
resources: settings.resources,
max_merge_write_throughput: None,
}
}
}
Expand Down Expand Up @@ -309,6 +315,7 @@ impl Default for IndexingSettings {
split_num_docs_target: Self::default_split_num_docs_target(),
merge_policy: MergePolicyConfig::default(),
resources: IndexingResources::default(),
max_merge_write_throughput: None,
}
}
}
Expand Down
23 changes: 15 additions & 8 deletions quickwit/quickwit-indexing/src/actors/index_serializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,21 @@ impl Handler<IndexedSplitBatchBuilder> for IndexSerializer {
batch_builder: IndexedSplitBatchBuilder,
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
let splits: Vec<IndexedSplit> = {
let _protect = ctx.protect_zone();
batch_builder
.splits
.into_iter()
.map(|split_builder| split_builder.finalize())
.collect::<Result<_, _>>()?
};
let mut splits: Vec<IndexedSplit> = Vec::with_capacity(batch_builder.splits.len());
for split_builder in batch_builder.splits {
// TODO Consider & test removing this protect guard.
//
// In theory the controlled directory should be sufficient.
let _protect_guard = ctx.protect_zone();
if let Some(controlled_directory) = &split_builder.controlled_directory_opt {
controlled_directory.set_progress_and_kill_switch(
ctx.progress().clone(),
ctx.kill_switch().clone(),
);
}
let split = split_builder.finalize()?;
splits.push(split);
}
let indexed_split_batch = IndexedSplitBatch {
batch_parent_span: batch_builder.batch_parent_span,
splits,
Expand Down
15 changes: 15 additions & 0 deletions quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,9 +276,24 @@ impl IndexingPipeline {
.set_kill_switch(self.kill_switch.clone())
.spawn(merge_packager);

let io_throttler = if let Some(max_num_bytes_per_secs) =
self.params.indexing_settings.max_merge_write_throughput
{
crate::throttle::create_throttle(
max_num_bytes_per_secs.get_bytes(),
Duration::from_millis(500),
crate::metrics::INDEXER_METRICS
.merge_write_num_bytes_total
.clone(),
)
} else {
crate::throttle::no_throttling()
};

let merge_executor = MergeExecutor::new(
self.params.pipeline_id.clone(),
self.params.metastore.clone(),
io_throttler,
merge_packager_mailbox,
);
let (merge_executor_mailbox, merge_executor_handler) = ctx
Expand Down
204 changes: 111 additions & 93 deletions quickwit/quickwit-indexing/src/actors/merge_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,13 @@ use crate::models::{
IndexedSplit, IndexedSplitBatch, IndexingPipelineId, MergeScratch, PublishLock,
ScratchDirectory, SplitAttrs,
};
use crate::throttle::Throttle;

#[derive(Clone)]
pub struct MergeExecutor {
pipeline_id: IndexingPipelineId,
metastore: Arc<dyn Metastore>,
io_throttle: Arc<Throttle>,
merge_packager_mailbox: Mailbox<Packager>,
}

Expand Down Expand Up @@ -219,78 +221,6 @@ pub fn combine_partition_ids(splits: &[SplitMetadata]) -> u64 {
combine_partition_ids_aux(splits.iter().map(|split| split.partition_id))
}

async fn merge_split_directories(
union_index_meta: IndexMeta,
split_directories: Vec<Box<dyn Directory>>,
delete_tasks: Vec<DeleteTask>,
doc_mapper_opt: Option<Arc<dyn DocMapper>>,
output_path: &Path,
ctx: &ActorContext<MergeExecutor>,
) -> anyhow::Result<ControlledDirectory> {
let shadowing_meta_json_directory = create_shadowing_meta_json_directory(union_index_meta)?;
// This directory is here to receive the merged split, as well as the final meta.json file.
let output_directory = ControlledDirectory::new(
Box::new(MmapDirectory::open(output_path)?),
ctx.progress().clone(),
ctx.kill_switch().clone(),
);
let mut directory_stack: Vec<Box<dyn Directory>> = vec![
output_directory.box_clone(),
Box::new(shadowing_meta_json_directory),
];
directory_stack.extend(split_directories.into_iter());
let union_directory = UnionDirectory::union_of(directory_stack);
let union_index = open_index(union_directory)?;

ctx.record_progress();
let _protect_guard = ctx.protect_zone();

let mut index_writer = union_index.writer_with_num_threads(1, 3_000_000)?;
let num_delete_tasks = delete_tasks.len();
if num_delete_tasks > 0 {
let doc_mapper = doc_mapper_opt
.ok_or_else(|| anyhow!("Doc mapper must be present if there are delete tasks."))?;
for delete_task in delete_tasks {
let delete_query = delete_task
.delete_query
.expect("A delete task must have a delete query.");
let search_request = SearchRequest {
index_id: delete_query.index_id,
query: delete_query.query,
start_timestamp: delete_query.start_timestamp,
end_timestamp: delete_query.end_timestamp,
search_fields: delete_query.search_fields,
..Default::default()
};
debug!(
"Delete all documents matched by query `{:?}`",
search_request
);
let query = doc_mapper.query(union_index.schema(), &search_request)?;
index_writer.delete_query(query)?;
}
debug!("commit-delete-operations");
index_writer.commit()?;
}

let segment_ids: Vec<SegmentId> = union_index
.searchable_segment_metas()?
.into_iter()
.map(|segment_meta| segment_meta.id())
.collect();

// A merge is useless if there is no delete and only one segment.
if num_delete_tasks == 0 && segment_ids.len() <= 1 {
return Ok(output_directory);
}

debug!(segment_ids=?segment_ids,"merging-segments");
// TODO it would be nice if tantivy could let us run the merge in the current thread.
index_writer.merge(&segment_ids).wait()?;

Ok(output_directory)
}

pub fn merge_split_attrs(
merge_split_id: String,
pipeline_id: &IndexingPipelineId,
Expand Down Expand Up @@ -334,11 +264,13 @@ impl MergeExecutor {
pub fn new(
pipeline_id: IndexingPipelineId,
metastore: Arc<dyn Metastore>,
io_throttle: Arc<Throttle>,
merge_packager_mailbox: Mailbox<Packager>,
) -> Self {
MergeExecutor {
pipeline_id,
metastore,
io_throttle,
merge_packager_mailbox,
}
}
Expand All @@ -354,15 +286,16 @@ impl MergeExecutor {
let (union_index_meta, split_directories) = open_split_directories(&tantivy_dirs)?;
// TODO it would be nice if tantivy could let us run the merge in the current thread.
fail_point!("before-merge-split");
let controlled_directory = merge_split_directories(
union_index_meta,
split_directories,
Vec::new(),
None,
merge_scratch_directory.path(),
ctx,
)
.await?;
let controlled_directory = self
.merge_split_directories(
union_index_meta,
split_directories,
Vec::new(),
None,
merge_scratch_directory.path(),
ctx,
)
.await?;
fail_point!("after-merge-split");

// This will have the side effect of deleting the directory containing the downloaded
Expand Down Expand Up @@ -417,15 +350,16 @@ impl MergeExecutor {
);

let (union_index_meta, split_directories) = open_split_directories(&tantivy_dirs)?;
let controlled_directory = merge_split_directories(
union_index_meta,
split_directories,
delete_tasks,
Some(doc_mapper.clone()),
merge_scratch_directory.path(),
ctx,
)
.await?;
let controlled_directory = self
.merge_split_directories(
union_index_meta,
split_directories,
delete_tasks,
Some(doc_mapper.clone()),
merge_scratch_directory.path(),
ctx,
)
.await?;

// This will have the side effect of deleting the directory containing the downloaded split.
let mut merged_index = Index::open(controlled_directory.clone())?;
Expand Down Expand Up @@ -492,6 +426,80 @@ impl MergeExecutor {
};
Ok(Some(indexed_split))
}

async fn merge_split_directories(
&self,
union_index_meta: IndexMeta,
split_directories: Vec<Box<dyn Directory>>,
delete_tasks: Vec<DeleteTask>,
doc_mapper_opt: Option<Arc<dyn DocMapper>>,
output_path: &Path,
ctx: &ActorContext<MergeExecutor>,
) -> anyhow::Result<ControlledDirectory> {
let shadowing_meta_json_directory = create_shadowing_meta_json_directory(union_index_meta)?;
// This directory is here to receive the merged split, as well as the final meta.json file.
let output_directory = ControlledDirectory::new(
Box::new(MmapDirectory::open(output_path)?),
ctx.progress().clone(),
ctx.kill_switch().clone(),
self.io_throttle.clone(),
);
let mut directory_stack: Vec<Box<dyn Directory>> = vec![
output_directory.box_clone(),
Box::new(shadowing_meta_json_directory),
];
directory_stack.extend(split_directories.into_iter());
let union_directory = UnionDirectory::union_of(directory_stack);
let union_index = open_index(union_directory)?;

ctx.record_progress();
let _protect_guard = ctx.protect_zone();

let mut index_writer = union_index.writer_with_num_threads(1, 3_000_000)?;
let num_delete_tasks = delete_tasks.len();
if num_delete_tasks > 0 {
let doc_mapper = doc_mapper_opt
.ok_or_else(|| anyhow!("Doc mapper must be present if there are delete tasks."))?;
for delete_task in delete_tasks {
let delete_query = delete_task
.delete_query
.expect("A delete task must have a delete query.");
let search_request = SearchRequest {
index_id: delete_query.index_id,
query: delete_query.query,
start_timestamp: delete_query.start_timestamp,
end_timestamp: delete_query.end_timestamp,
search_fields: delete_query.search_fields,
..Default::default()
};
debug!(
"Delete all documents matched by query `{:?}`",
search_request
);
let query = doc_mapper.query(union_index.schema(), &search_request)?;
index_writer.delete_query(query)?;
}
debug!("commit-delete-operations");
index_writer.commit()?;
}

let segment_ids: Vec<SegmentId> = union_index
.searchable_segment_metas()?
.into_iter()
.map(|segment_meta| segment_meta.id())
.collect();

// A merge is useless if there is no delete and only one segment.
if num_delete_tasks == 0 && segment_ids.len() <= 1 {
return Ok(output_directory);
}

debug!(segment_ids=?segment_ids,"merging-segments");
// TODO it would be nice if tantivy could let us run the merge in the current thread.
index_writer.merge(&segment_ids).wait()?;

Ok(output_directory)
}
}

fn open_index<T: Into<Box<dyn Directory>>>(directory: T) -> tantivy::Result<Index> {
Expand All @@ -510,6 +518,7 @@ mod tests {
use super::*;
use crate::merge_policy::MergeOperation;
use crate::models::{IndexingPipelineId, ScratchDirectory};
use crate::throttle::no_throttling;
use crate::{get_tantivy_directory_from_split_bundle, new_split_id, TestSandbox};

#[tokio::test]
Expand Down Expand Up @@ -574,7 +583,12 @@ mod tests {
downloaded_splits_directory,
};
let (merge_packager_mailbox, merge_packager_inbox) = create_test_mailbox();
let merge_executor = MergeExecutor::new(pipeline_id, metastore, merge_packager_mailbox);
let merge_executor = MergeExecutor::new(
pipeline_id,
metastore,
no_throttling(),
merge_packager_mailbox,
);
let universe = Universe::new();
let (merge_executor_mailbox, merge_executor_handle) =
universe.spawn_builder().spawn(merge_executor);
Expand Down Expand Up @@ -709,8 +723,12 @@ mod tests {
downloaded_splits_directory,
};
let (merge_packager_mailbox, merge_packager_inbox) = create_test_mailbox();
let delete_task_executor =
MergeExecutor::new(pipeline_id, metastore, merge_packager_mailbox);
let delete_task_executor = MergeExecutor::new(
pipeline_id,
metastore,
no_throttling(),
merge_packager_mailbox,
);
let universe = Universe::new();
let (delete_task_executor_mailbox, delete_task_executor_handle) =
universe.spawn_builder().spawn(delete_task_executor);
Expand Down
8 changes: 0 additions & 8 deletions quickwit/quickwit-indexing/src/actors/packager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,14 +134,6 @@ impl Handler<IndexedSplitBatch> for Packager {
split_ids=?split_ids,
"start-packaging-splits"
);
for split in &batch.splits {
if let Some(controlled_directory) = &split.controlled_directory_opt {
controlled_directory.set_progress_and_kill_switch(
ctx.progress().clone(),
ctx.kill_switch().clone(),
);
}
}
fail_point!("packager:before");
let mut packaged_splits = Vec::new();
for split in batch.splits {
Expand Down
Loading