Skip to content

Strategized Compaction Plan #5186

@zhangyue19921010

Description

@zhangyue19921010

Background

Currently, Lance allows users to manually trigger a standalone job to perform Compaction, to achieve the following:

  1. Merging small files.
  2. Materializing deletions.

However, in practice, there are some limitations:

  1. The current Compaction plan phase only supports simple grouping strategies(as shown below), which are not sufficiently comprehensive. Adding new grouping logic is tightly coupled with existing logic.
    • Grouping based on target_rows_per_fragment.
    • Grouping based on whether a fragment contains deleted rows.
  2. The current Compaction is a full compaction, with no limit on the total target I/O. This leads to high pressure on the Compaction Job and makes it difficult to guarantee efficiency.

Design

Image

New CompactionPlanner Trait

#[async_trait::async_trait]
pub trait CompactionPlanner: Send + Sync {
    // get all fragments by default
    fn get_fragments(&self, dataset: &Dataset, _options: &CompactionOptions) -> Vec<FileFragment> {
        // get_fragments should be returning fragments in sorted order (by id)
        // and fragment ids should be unique
        dataset.get_fragments()
    }

    // no filter by default
    async fn filter_fragments(
        &self,
        _dataset: &Dataset,
        fragments: Vec<FileFragment>,
        _options: &CompactionOptions,
    ) -> Result<Vec<FileFragment>> {
        Ok(fragments)
    }

    async fn plan(&self, dataset: &Dataset, options: &CompactionOptions) -> Result<CompactionPlan>;
}

A new CompactionPlanner trait is introduced with a core plan method. This allows users to define custom compaction plan strategies, offering greater flexibility and decoupling the logic of different strategies.

Default Implementation

#[derive(Debug, Clone, Default)]
pub struct DefaultCompactionPlanner;

#[async_trait::async_trait]
impl CompactionPlanner for DefaultCompactionPlanner {
    async fn plan(
        &self,
        dataset: &Dataset,
        options: &CompactionOptions,
    ) -> Result<CompactionPlan> {
        // ..... Same as the current implementation
    }
}

Provide a default implementation to ensure that the current behavior remains identical to the historical behavior.

Landing Steps

  1. Complete the foundational work for the Strategized Compaction Plan and provide a default implementation to ensure that the current behavior remains identical to the historical behavior.
  2. Introduce new compaction plan strategies from different perspectives, for example:
    • IO-bounded Compaction Plan: Limits the total I/O of each compaction to control the load of a single merge job.
    • Deletion-Only Compaction Plan: Only merges fragments with deleted rows.
    • Incremental Compaction Plan: Only performs compaction on newly changed fragments.
    • etc.

Core API Compatibility

Two core APIs have been retained, and default implementations are provided within them to ensure API compatibility.

compact_files

pub async fn compact_files(
    dataset: &mut Dataset,
    options: CompactionOptions,
    remap_options: Option<Arc<dyn IndexRemapperOptions>>, // These will be deprecated later
) -> Result<CompactionMetrics> {
    let planner = DefaultCompactionPlanner;
    compact_files_with_planner(dataset, options, remap_options, &planner).await
}

plan_compaction

pub async fn plan_compaction(
    dataset: &Dataset,
    options: &CompactionOptions,
) -> Result<CompactionPlan> {
    let planner = DefaultCompactionPlanner;
    planner.plan(dataset, options).await
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions