diff --git a/rust/lance/src/dataset/optimize.rs b/rust/lance/src/dataset/optimize.rs index 8d98310194b..d733cad64c4 100644 --- a/rust/lance/src/dataset/optimize.rs +++ b/rust/lance/src/dataset/optimize.rs @@ -206,9 +206,161 @@ impl AddAssign for CompactionMetrics { } } +/// Trait for implementing custom compaction planning strategies. +/// +/// This trait allows users to define their own compaction strategies by implementing +/// the `plan` method. The default implementation is provided by [`DefaultCompactionPlanner`]. +#[async_trait::async_trait] +pub trait CompactionPlanner: Send + Sync { + /// Build compaction plan. + /// + /// This method analyzes the dataset's fragments and generates a [`CompactionPlan`] + /// containing a list of compaction tasks to execute. + /// + /// # Arguments + /// + /// * `dataset` - Reference to the dataset to be compacted + async fn plan(&self, dataset: &Dataset) -> Result; +} + +/// Formulate a plan to compact the files in a dataset +/// +/// The compaction plan will contain a list of tasks to execute. Each task +/// will contain approximately `target_rows_per_fragment` rows and will be +/// rewriting fragments that are adjacent in the dataset's fragment list. Some +/// tasks may contain a single fragment when that fragment has deletions that +/// are being materialized and doesn't have any neighbors that need to be +/// compacted. +#[derive(Debug, Clone, Default)] +pub struct DefaultCompactionPlanner { + options: CompactionOptions, +} + +impl DefaultCompactionPlanner { + pub fn new(mut options: CompactionOptions) -> Self { + options.validate(); + Self { options } + } +} + +#[async_trait::async_trait] +impl CompactionPlanner for DefaultCompactionPlanner { + async fn plan(&self, dataset: &Dataset) -> Result { + // get_fragments should be returning fragments in sorted order (by id) + // and fragment ids should be unique + let fragments = dataset.get_fragments(); + + debug_assert!( + fragments.windows(2).all(|w| w[0].id() < w[1].id()), + "fragments in manifest are not sorted" + ); + let mut fragment_metrics = futures::stream::iter(fragments) + .map(|fragment| async move { + match collect_metrics(&fragment).await { + Ok(metrics) => Ok((fragment.metadata, metrics)), + Err(e) => Err(e), + } + }) + .buffered(dataset.object_store().io_parallelism()); + + let index_fragmaps = load_index_fragmaps(dataset).await?; + let indices_containing_frag = |frag_id: u32| { + index_fragmaps + .iter() + .enumerate() + .filter(|(_, bitmap)| bitmap.contains(frag_id)) + .map(|(pos, _)| pos) + .collect::>() + }; + + let mut candidate_bins: Vec = Vec::new(); + let mut current_bin: Option = None; + let mut i = 0; + + while let Some(res) = fragment_metrics.next().await { + let (fragment, metrics) = res?; + + let candidacy = if self.options.materialize_deletions + && metrics.deletion_percentage() > self.options.materialize_deletions_threshold + { + Some(CompactionCandidacy::CompactItself) + } else if metrics.physical_rows < self.options.target_rows_per_fragment { + // Only want to compact if their are neighbors to compact such that + // we can get a larger fragment. + Some(CompactionCandidacy::CompactWithNeighbors) + } else { + // Not a candidate + None + }; + + let indices = indices_containing_frag(fragment.id as u32); + + match (candidacy, &mut current_bin) { + (None, None) => {} // keep searching + (Some(candidacy), None) => { + // Start a new bin + current_bin = Some(CandidateBin { + fragments: vec![fragment], + pos_range: i..(i + 1), + candidacy: vec![candidacy], + row_counts: vec![metrics.num_rows()], + indices, + }); + } + (Some(candidacy), Some(bin)) => { + // We cannot mix "indexed" and "non-indexed" fragments and so we only consider + // the existing bin if it contains the same indices + if bin.indices == indices { + // Add to current bin + bin.fragments.push(fragment); + bin.pos_range.end += 1; + bin.candidacy.push(candidacy); + bin.row_counts.push(metrics.num_rows()); + } else { + // Index set is different. Complete previous bin and start new one + candidate_bins.push(current_bin.take().unwrap()); + current_bin = Some(CandidateBin { + fragments: vec![fragment], + pos_range: i..(i + 1), + candidacy: vec![candidacy], + row_counts: vec![metrics.num_rows()], + indices, + }); + } + } + (None, Some(_)) => { + // Bin is complete + candidate_bins.push(current_bin.take().unwrap()); + } + } + + i += 1; + } + + // Flush the last bin + if let Some(bin) = current_bin { + candidate_bins.push(bin); + } + + let final_bins = candidate_bins + .into_iter() + .filter(|bin| !bin.is_noop()) + .flat_map(|bin| bin.split_for_size(self.options.target_rows_per_fragment)) + .map(|bin| TaskData { + fragments: bin.fragments, + }); + + let mut compaction_plan = + CompactionPlan::new(dataset.manifest.version, self.options.clone()); + compaction_plan.extend_tasks(final_bins); + + Ok(compaction_plan) + } +} + /// Compacts the files in the dataset without reordering them. /// -/// This does a few things: +/// By default, this does a few things: /// * Removes deleted rows from fragments. /// * Removes dropped columns from fragments. /// * Merges fragments that are too small. @@ -218,13 +370,20 @@ impl AddAssign for CompactionMetrics { /// If no compaction is needed, this method will not make a new version of the table. pub async fn compact_files( dataset: &mut Dataset, - mut options: CompactionOptions, + options: CompactionOptions, remap_options: Option>, // These will be deprecated later ) -> Result { info!(target: TRACE_DATASET_EVENTS, event=DATASET_COMPACTING_EVENT, uri = &dataset.uri); - options.validate(); + let planner = DefaultCompactionPlanner::new(options); + compact_files_with_planner(dataset, remap_options, &planner).await +} - let compaction_plan: CompactionPlan = plan_compaction(dataset, &options).await?; +pub async fn compact_files_with_planner( + dataset: &mut Dataset, + remap_options: Option>, // These will be deprecated later + planner: &dyn CompactionPlanner, +) -> Result { + let compaction_plan: CompactionPlan = planner.plan(dataset).await?; // If nothing to compact, don't make a commit. if compaction_plan.tasks().is_empty() { @@ -234,16 +393,23 @@ pub async fn compact_files( let dataset_ref = &dataset.clone(); let result_stream = futures::stream::iter(compaction_plan.tasks.into_iter()) - .map(|task| rewrite_files(Cow::Borrowed(dataset_ref), task, &options)) + .map(|task| rewrite_files(Cow::Borrowed(dataset_ref), task, &compaction_plan.options)) .buffer_unordered( - options + compaction_plan + .options .num_threads .unwrap_or_else(get_num_compute_intensive_cpus), ); let completed_tasks: Vec = result_stream.try_collect().await?; let remap_options = remap_options.unwrap_or(Arc::new(DatasetIndexRemapperOptions::default())); - let metrics = commit_compaction(dataset, completed_tasks, remap_options, &options).await?; + let metrics = commit_compaction( + dataset, + completed_tasks, + remap_options, + &compaction_plan.options, + ) + .await?; Ok(metrics) } @@ -458,125 +624,12 @@ async fn load_index_fragmaps(dataset: &Dataset) -> Result> { Ok(index_fragmaps) } -/// Formulate a plan to compact the files in a dataset -/// -/// The compaction plan will contain a list of tasks to execute. Each task -/// will contain approximately `target_rows_per_fragment` rows and will be -/// rewriting fragments that are adjacent in the dataset's fragment list. Some -/// tasks may contain a single fragment when that fragment has deletions that -/// are being materialized and doesn't have any neighbors that need to be -/// compacted. pub async fn plan_compaction( dataset: &Dataset, options: &CompactionOptions, ) -> Result { - // get_fragments should be returning fragments in sorted order (by id) - // and fragment ids should be unique - let fragments = dataset.get_fragments(); - debug_assert!( - fragments.windows(2).all(|w| w[0].id() < w[1].id()), - "fragments in manifest are not sorted" - ); - let mut fragment_metrics = futures::stream::iter(fragments) - .map(|fragment| async move { - match collect_metrics(&fragment).await { - Ok(metrics) => Ok((fragment.metadata, metrics)), - Err(e) => Err(e), - } - }) - .buffered(dataset.object_store().io_parallelism()); - - let index_fragmaps = load_index_fragmaps(dataset).await?; - let indices_containing_frag = |frag_id: u32| { - index_fragmaps - .iter() - .enumerate() - .filter(|(_, bitmap)| bitmap.contains(frag_id)) - .map(|(pos, _)| pos) - .collect::>() - }; - - let mut candidate_bins: Vec = Vec::new(); - let mut current_bin: Option = None; - let mut i = 0; - - while let Some(res) = fragment_metrics.next().await { - let (fragment, metrics) = res?; - - let candidacy = if options.materialize_deletions - && metrics.deletion_percentage() > options.materialize_deletions_threshold - { - Some(CompactionCandidacy::CompactItself) - } else if metrics.physical_rows < options.target_rows_per_fragment { - // Only want to compact if their are neighbors to compact such that - // we can get a larger fragment. - Some(CompactionCandidacy::CompactWithNeighbors) - } else { - // Not a candidate - None - }; - - let indices = indices_containing_frag(fragment.id as u32); - - match (candidacy, &mut current_bin) { - (None, None) => {} // keep searching - (Some(candidacy), None) => { - // Start a new bin - current_bin = Some(CandidateBin { - fragments: vec![fragment], - pos_range: i..(i + 1), - candidacy: vec![candidacy], - row_counts: vec![metrics.num_rows()], - indices, - }); - } - (Some(candidacy), Some(bin)) => { - // We cannot mix "indexed" and "non-indexed" fragments and so we only consider - // the existing bin if it contains the same indices - if bin.indices == indices { - // Add to current bin - bin.fragments.push(fragment); - bin.pos_range.end += 1; - bin.candidacy.push(candidacy); - bin.row_counts.push(metrics.num_rows()); - } else { - // Index set is different. Complete previous bin and start new one - candidate_bins.push(current_bin.take().unwrap()); - current_bin = Some(CandidateBin { - fragments: vec![fragment], - pos_range: i..(i + 1), - candidacy: vec![candidacy], - row_counts: vec![metrics.num_rows()], - indices, - }); - } - } - (None, Some(_)) => { - // Bin is complete - candidate_bins.push(current_bin.take().unwrap()); - } - } - - i += 1; - } - - // Flush the last bin - if let Some(bin) = current_bin { - candidate_bins.push(bin); - } - - let final_bins = candidate_bins - .into_iter() - .filter(|bin| !bin.is_noop()) - .flat_map(|bin| bin.split_for_size(options.target_rows_per_fragment)) - .map(|bin| TaskData { - fragments: bin.fragments, - }); - - let mut compaction_plan = CompactionPlan::new(dataset.manifest.version, options.clone()); - compaction_plan.extend_tasks(final_bins); - - Ok(compaction_plan) + let planner = DefaultCompactionPlanner::new(options.clone()); + planner.plan(dataset).await } /// The result of a single compaction task. @@ -3580,4 +3633,41 @@ mod tests { plan ); } + + #[tokio::test] + async fn test_default_compaction_planner() { + let test_dir = TempStrDir::default(); + let test_uri = &test_dir; + + let data = sample_data(); + let schema = data.schema(); + + // Create dataset with multiple small fragments + let reader = RecordBatchIterator::new(vec![Ok(data.clone())], schema.clone()); + let write_params = WriteParams { + max_rows_per_file: 2000, + ..Default::default() + }; + let dataset = Dataset::write(reader, test_uri, Some(write_params)) + .await + .unwrap(); + + assert_eq!(dataset.get_fragments().len(), 5); + + // Test default planner + let options = CompactionOptions { + target_rows_per_fragment: 5000, + materialize_deletions_threshold: 2.0, + ..Default::default() + }; + + let planner = DefaultCompactionPlanner::new(options); + let plan = planner.plan(&dataset).await.unwrap(); + + // Should create tasks to compact small fragments + assert!(!plan.tasks.is_empty()); + assert_eq!(plan.read_version, dataset.manifest.version); + // make sure options.validate() worked + assert!(!plan.options.materialize_deletions); + } }