diff --git a/rust/lance-encoding/src/encodings/logical/primitive.rs b/rust/lance-encoding/src/encodings/logical/primitive.rs index 3eb424230e8..9e22c414d48 100644 --- a/rust/lance-encoding/src/encodings/logical/primitive.rs +++ b/rust/lance-encoding/src/encodings/logical/primitive.rs @@ -1279,35 +1279,34 @@ impl MiniBlockScheduler { let dictionary = if let Some(dictionary_encoding) = layout.dictionary.as_ref() { let num_dictionary_items = layout.num_dictionary_items; - match dictionary_encoding.compression.as_ref().unwrap() { - Compression::Variable(_) => Some(MiniBlockSchedulerDictionary { - dictionary_decompressor: decompressors - .create_block_decompressor(dictionary_encoding)? - .into(), - dictionary_buf_position_and_size: buffer_offsets_and_sizes[2], - dictionary_data_alignment: 4, - num_dictionary_items, - }), - Compression::Flat(_) => Some(MiniBlockSchedulerDictionary { - dictionary_decompressor: decompressors - .create_block_decompressor(dictionary_encoding)? - .into(), - dictionary_buf_position_and_size: buffer_offsets_and_sizes[2], - dictionary_data_alignment: 16, - num_dictionary_items, - }), - Compression::General(_) => Some(MiniBlockSchedulerDictionary { - dictionary_decompressor: decompressors - .create_block_decompressor(dictionary_encoding)? + let dictionary_decompressor = decompressors + .create_block_decompressor(dictionary_encoding)? + .into(); + let dictionary_data_alignment = match dictionary_encoding.compression.as_ref().unwrap() + { + Compression::Variable(_) => 4, + Compression::Flat(_) => 16, + Compression::General(_) => 1, + Compression::InlineBitpacking(_) | Compression::OutOfLineBitpacking(_) => { + crate::encoder::MIN_PAGE_BUFFER_ALIGNMENT + } + _ => { + return Err(Error::InvalidInput { + source: format!( + "Unsupported mini-block dictionary encoding: {:?}", + dictionary_encoding.compression.as_ref().unwrap() + ) .into(), - dictionary_buf_position_and_size: buffer_offsets_and_sizes[2], - dictionary_data_alignment: 1, - num_dictionary_items, - }), - _ => unreachable!( - "Mini-block dictionary encoding must use Variable, Flat, or General compression" - ), - } + location: location!(), + }) + } + }; + Some(MiniBlockSchedulerDictionary { + dictionary_decompressor, + dictionary_buf_position_and_size: buffer_offsets_and_sizes[2], + dictionary_data_alignment, + num_dictionary_items, + }) } else { None }; @@ -4903,6 +4902,7 @@ mod tests { FullZipScheduler, MiniBlockRepIndex, PerValueDecompressor, PreambleAction, StructuralPageScheduler, }; + use crate::compression::DefaultDecompressionStrategy; use crate::constants::{STRUCTURAL_ENCODING_META_KEY, STRUCTURAL_ENCODING_MINIBLOCK}; use crate::data::BlockInfo; use crate::decoder::PageEncoding; @@ -4911,6 +4911,7 @@ mod tests { }; use crate::format::pb21; use crate::format::pb21::compressive_encoding::Compression; + use crate::format::ProtobufUtils21; use crate::testing::{check_round_trip_encoding_of_data, TestCases}; use crate::version::LanceFileVersion; use arrow_array::{ArrayRef, Int8Array, StringArray, UInt64Array}; @@ -6070,6 +6071,44 @@ mod tests { check_round_trip_encoding_of_data(vec![array], &test_cases, metadata).await; } + #[test] + fn test_miniblock_dictionary_out_of_line_bitpacking_decode() { + let rows = 10_000; + let unique_values = 2_000; + + let dictionary_encoding = + ProtobufUtils21::out_of_line_bitpacking(64, ProtobufUtils21::flat(11, None)); + let layout = pb21::MiniBlockLayout { + rep_compression: None, + def_compression: None, + value_compression: Some(ProtobufUtils21::flat(64, None)), + dictionary: Some(dictionary_encoding), + num_dictionary_items: unique_values, + layers: vec![pb21::RepDefLayer::RepdefAllValidItem as i32], + num_buffers: 1, + repetition_index_depth: 0, + num_items: rows, + has_large_chunk: false, + }; + + let buffer_offsets_and_sizes = vec![(0, 0), (0, 0), (0, 0)]; + let scheduler = super::MiniBlockScheduler::try_new( + &buffer_offsets_and_sizes, + /*priority=*/ 0, + /*items_in_page=*/ rows, + &layout, + &DefaultDecompressionStrategy::default(), + ) + .unwrap(); + + let dictionary = scheduler.dictionary.unwrap(); + assert_eq!(dictionary.num_dictionary_items, unique_values); + assert_eq!( + dictionary.dictionary_data_alignment, + crate::encoder::MIN_PAGE_BUFFER_ALIGNMENT + ); + } + // Dictionary encoding decision tests /// Helper to create FixedWidth test data block with exact cardinality stat injected /// to ensure consistent test behavior (avoids HLL estimation error) diff --git a/rust/lance/src/dataset/tests/dataset_scanner.rs b/rust/lance/src/dataset/tests/dataset_scanner.rs index 9fce5f6d2ca..3ebaf6da8f7 100644 --- a/rust/lance/src/dataset/tests/dataset_scanner.rs +++ b/rust/lance/src/dataset/tests/dataset_scanner.rs @@ -1,6 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors +use std::collections::HashMap; use std::sync::Arc; use std::vec; @@ -9,19 +10,28 @@ use lance_arrow::json::{is_arrow_json_field, json_field, JsonArray}; use lance_arrow::FixedSizeListArrayExt; use arrow::compute::concat_batches; +use arrow_array::UInt64Array; use arrow_array::{Array, FixedSizeListArray}; use arrow_array::{Float32Array, Int32Array, RecordBatch, RecordBatchIterator, StringArray}; use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema, SchemaRef}; use futures::TryStreamExt; use lance_arrow::SchemaExt; +use lance_core::cache::LanceCache; +use lance_encoding::decoder::DecoderPlugins; +use lance_file::reader::{describe_encoding, FileReader, FileReaderOptions}; +use lance_file::version::LanceFileVersion; use lance_index::scalar::inverted::{ query::PhraseQuery, tokenizer::InvertedIndexParams, SCORE_FIELD, }; use lance_index::scalar::FullTextSearchQuery; use lance_index::{vector::DIST_COL, DatasetIndexExt, IndexType}; +use lance_io::scheduler::{ScanScheduler, SchedulerConfig}; +use lance_io::utils::CachedFileSize; use lance_linalg::distance::MetricType; +use uuid::Uuid; use crate::dataset::scanner::{DatasetRecordBatchStream, QueryFilter}; +use crate::dataset::write::WriteParams; use crate::Dataset; use lance_index::scalar::inverted::query::FtsQuery; use lance_index::vector::ivf::IvfBuildParams; @@ -366,6 +376,93 @@ async fn test_scan_limit_offset_preserves_json_extension_metadata() { assert_eq!(batch_no_offset.schema(), batch_with_offset.schema()); } +#[tokio::test] +async fn test_scan_miniblock_dictionary_out_of_line_bitpacking_does_not_panic() { + let rows: usize = 10_000; + let unique_values: usize = 2_000; + let batch_size: usize = 8_192; + + let mut field_meta = HashMap::new(); + field_meta.insert( + "lance-encoding:structural-encoding".to_string(), + "miniblock".to_string(), + ); + field_meta.insert( + "lance-encoding:dict-size-ratio".to_string(), + "0.99".to_string(), + ); + + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "d", + DataType::UInt64, + false, + ) + .with_metadata(field_meta)])); + + let values = (0..rows) + .map(|i| (i % unique_values) as u64) + .collect::>(); + let batch = + RecordBatch::try_new(schema.clone(), vec![Arc::new(UInt64Array::from(values))]).unwrap(); + + let uri = format!("memory://{}", Uuid::new_v4()); + let reader = RecordBatchIterator::new(vec![Ok(batch)].into_iter(), schema.clone()); + + let write_params = WriteParams { + data_storage_version: Some(LanceFileVersion::V2_2), + ..WriteParams::default() + }; + let dataset = Dataset::write(reader, &uri, Some(write_params)) + .await + .unwrap(); + + let field_id = dataset.schema().field("d").unwrap().id as u32; + let fragment = dataset.get_fragment(0).unwrap(); + let data_file = fragment.data_file_for_field(field_id).unwrap(); + let field_pos = data_file + .fields + .iter() + .position(|id| *id == field_id as i32) + .unwrap(); + let column_idx = data_file.column_indices[field_pos] as usize; + + let file_path = dataset.data_dir().child(data_file.path.as_str()); + let scheduler = ScanScheduler::new( + dataset.object_store.clone(), + SchedulerConfig::max_bandwidth(&dataset.object_store), + ); + let file_scheduler = scheduler + .open_file(&file_path, &CachedFileSize::unknown()) + .await + .unwrap(); + + let cache = LanceCache::with_capacity(8 * 1024 * 1024); + let file_reader = FileReader::try_open( + file_scheduler, + None, + Arc::::default(), + &cache, + FileReaderOptions::default(), + ) + .await + .unwrap(); + + let col_meta = &file_reader.metadata().column_metadatas[column_idx]; + let encoding = describe_encoding(col_meta.pages.first().unwrap()); + assert!( + encoding.contains("OutOfLineBitpacking") && encoding.contains("dictionary"), + "Expected a mini-block dictionary page with out-of-line bitpacking, got: {encoding}" + ); + + let mut scanner = dataset.scan(); + scanner.batch_size(batch_size); + scanner.project(&["d"]).unwrap(); + + let mut stream = scanner.try_into_stream().await.unwrap(); + let batch = stream.try_next().await.unwrap().unwrap(); + assert_eq!(batch.num_columns(), 1); +} + async fn prepare_query_filter_dataset() -> Dataset { let schema = Arc::new(ArrowSchema::new(vec![ ArrowField::new("id", DataType::Int32, false),