Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
208 changes: 206 additions & 2 deletions rust/lance-table/src/format/manifest.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

use std::collections::HashMap;
use std::collections::{BTreeMap, HashMap};
use std::ops::Range;
use std::sync::Arc;

Expand Down Expand Up @@ -124,6 +124,49 @@ fn compute_fragment_offsets(fragments: &[Fragment]) -> Vec<usize> {
.collect()
}

#[derive(Default)]
pub struct ManifestSummary {
pub total_fragments: u64,
pub total_data_files: u64,
pub total_files_size: u64,
pub total_deletion_files: u64,
pub total_data_file_rows: u64,
pub total_deletion_file_rows: u64,
pub total_rows: u64,
}

impl From<ManifestSummary> for BTreeMap<String, String> {
fn from(summary: ManifestSummary) -> Self {
let mut stats_map = Self::new();
stats_map.insert(
"total_fragments".to_string(),
summary.total_fragments.to_string(),
);
stats_map.insert(
"total_data_files".to_string(),
summary.total_data_files.to_string(),
);
stats_map.insert(
"total_files_size".to_string(),
summary.total_files_size.to_string(),
);
stats_map.insert(
"total_deletion_files".to_string(),
summary.total_deletion_files.to_string(),
);
stats_map.insert(
"total_data_file_rows".to_string(),
summary.total_data_file_rows.to_string(),
);
stats_map.insert(
"total_deletion_file_rows".to_string(),
summary.total_deletion_file_rows.to_string(),
);
stats_map.insert("total_rows".to_string(), summary.total_rows.to_string());
stats_map
}
}

impl Manifest {
pub fn new(
schema: Schema,
Expand Down Expand Up @@ -477,6 +520,52 @@ impl Manifest {
pub fn should_use_legacy_format(&self) -> bool {
self.data_storage_format.version == LEGACY_FORMAT_VERSION
}

/// Get the summary information of a manifest.
///
/// This function calculates various statistics about the manifest, including:
/// - total_files_size: Total size of all data files in bytes
/// - total_fragments: Total number of fragments in the dataset
/// - total_data_files: Total number of data files across all fragments
/// - total_deletion_files: Total number of deletion files
/// - total_data_file_rows: Total number of rows in data files
/// - total_deletion_file_rows: Total number of deleted rows in deletion files
/// - total_rows: Total number of rows in the dataset
pub fn summary(&self) -> ManifestSummary {
// Calculate total fragments
let mut summary =
self.fragments
.iter()
.fold(ManifestSummary::default(), |mut summary, f| {
// Count data files in the current fragment
summary.total_data_files += f.files.len() as u64;
// Sum the number of rows for the current fragment (if available)
if let Some(num_rows) = f.num_rows() {
summary.total_rows += num_rows as u64;
}
// Sum file sizes for all data files in the current fragment (if available)
for data_file in &f.files {
if let Some(size_bytes) = data_file.file_size_bytes.get() {
summary.total_files_size += size_bytes.get();
}
}
// Check and count if the current fragment has a deletion file
if f.deletion_file.is_some() {
summary.total_deletion_files += 1;
}
// Sum the number of deleted rows from the deletion file (if available)
if let Some(deletion_file) = &f.deletion_file {
if let Some(num_deleted) = deletion_file.num_deleted_rows {
summary.total_deletion_file_rows += num_deleted as u64;
}
}
summary
});
summary.total_fragments = self.fragments.len() as u64;
summary.total_data_file_rows = summary.total_rows + summary.total_deletion_file_rows;

summary
}
}

#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
Expand Down Expand Up @@ -833,7 +922,8 @@ impl SelfDescribingFileReader for FileReader {

#[cfg(test)]
mod tests {
use crate::format::DataFile;
use crate::format::{DataFile, DeletionFile, DeletionFileType};
use std::num::NonZero;

use super::*;

Expand Down Expand Up @@ -990,4 +1080,118 @@ mod tests {
manifest.config_mut().remove("other-key");
assert_eq!(manifest.config, config);
}

#[test]
fn test_manifest_summary() {
// Step 1: test empty manifest summary
let arrow_schema = ArrowSchema::new(vec![
ArrowField::new("id", arrow_schema::DataType::Int64, false),
ArrowField::new("name", arrow_schema::DataType::Utf8, true),
]);
let schema = Schema::try_from(&arrow_schema).unwrap();

let empty_manifest = Manifest::new(
schema.clone(),
Arc::new(vec![]),
DataStorageFormat::default(),
None,
HashMap::new(),
);

let empty_summary = empty_manifest.summary();
assert_eq!(empty_summary.total_rows, 0);
assert_eq!(empty_summary.total_files_size, 0);
assert_eq!(empty_summary.total_fragments, 0);
assert_eq!(empty_summary.total_data_files, 0);
assert_eq!(empty_summary.total_deletion_file_rows, 0);
assert_eq!(empty_summary.total_data_file_rows, 0);
assert_eq!(empty_summary.total_deletion_files, 0);

// Step 2: write empty files and verify summary
let empty_fragments = vec![
Fragment::with_file_legacy(0, "empty_file1.lance", &schema, Some(0)),
Fragment::with_file_legacy(1, "empty_file2.lance", &schema, Some(0)),
];

let empty_files_manifest = Manifest::new(
schema.clone(),
Arc::new(empty_fragments),
DataStorageFormat::default(),
None,
HashMap::new(),
);

let empty_files_summary = empty_files_manifest.summary();
assert_eq!(empty_files_summary.total_rows, 0);
assert_eq!(empty_files_summary.total_files_size, 0);
assert_eq!(empty_files_summary.total_fragments, 2);
assert_eq!(empty_files_summary.total_data_files, 2);
assert_eq!(empty_files_summary.total_deletion_file_rows, 0);
assert_eq!(empty_files_summary.total_data_file_rows, 0);
assert_eq!(empty_files_summary.total_deletion_files, 0);

// Step 3: write real data and verify summary
let real_fragments = vec![
Fragment::with_file_legacy(0, "data_file1.lance", &schema, Some(100)),
Fragment::with_file_legacy(1, "data_file2.lance", &schema, Some(250)),
Fragment::with_file_legacy(2, "data_file3.lance", &schema, Some(75)),
];

let real_data_manifest = Manifest::new(
schema.clone(),
Arc::new(real_fragments),
DataStorageFormat::default(),
None,
HashMap::new(),
);

let real_data_summary = real_data_manifest.summary();
assert_eq!(real_data_summary.total_rows, 425); // 100 + 250 + 75
assert_eq!(real_data_summary.total_files_size, 0); // Zero for unknown
assert_eq!(real_data_summary.total_fragments, 3);
assert_eq!(real_data_summary.total_data_files, 3);
assert_eq!(real_data_summary.total_deletion_file_rows, 0);
assert_eq!(real_data_summary.total_data_file_rows, 425);
assert_eq!(real_data_summary.total_deletion_files, 0);

let file_version = LanceFileVersion::default();
// Step 4: write deletion files and verify summary
let mut fragment_with_deletion = Fragment::new(0)
.with_file(
"data_with_deletion.lance",
vec![0, 1],
vec![0, 1],
&file_version,
NonZero::new(1000),
)
.with_physical_rows(50);
fragment_with_deletion.deletion_file = Some(DeletionFile {
read_version: 123,
id: 456,
file_type: DeletionFileType::Array,
num_deleted_rows: Some(10),
base_id: None,
});

let manifest_with_deletion = Manifest::new(
schema,
Arc::new(vec![fragment_with_deletion]),
DataStorageFormat::default(),
None,
HashMap::new(),
);

let deletion_summary = manifest_with_deletion.summary();
assert_eq!(deletion_summary.total_rows, 40); // 50 - 10
assert_eq!(deletion_summary.total_files_size, 1000);
assert_eq!(deletion_summary.total_fragments, 1);
assert_eq!(deletion_summary.total_data_files, 1);
assert_eq!(deletion_summary.total_deletion_file_rows, 10);
assert_eq!(deletion_summary.total_data_file_rows, 50);
assert_eq!(deletion_summary.total_deletion_files, 1);

//Just verify the transformation is OK
let stats_map: BTreeMap<String, String> = deletion_summary.into();
assert_eq!(stats_map.len(), 7)
}
}
2 changes: 1 addition & 1 deletion rust/lance/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ impl From<&Manifest> for Version {
Self {
version: m.version,
timestamp: m.timestamp(),
metadata: BTreeMap::default(),
metadata: m.summary().into(),
}
}
}
Expand Down
Loading