From 5376cc2beb25771c9058c1d2a968b40151c8c357 Mon Sep 17 00:00:00 2001 From: YueZhang Date: Thu, 22 Jan 2026 19:50:56 +0800 Subject: [PATCH 1/9] need more test --- rust/lance-core/src/datatypes.rs | 13 ---- rust/lance/src/dataset/fragment.rs | 4 +- rust/lance/src/dataset/hash_joiner.rs | 99 +++++++++++++++------------ 3 files changed, 58 insertions(+), 58 deletions(-) diff --git a/rust/lance-core/src/datatypes.rs b/rust/lance-core/src/datatypes.rs index dd56e610f52..704c1c4dbe6 100644 --- a/rust/lance-core/src/datatypes.rs +++ b/rust/lance-core/src/datatypes.rs @@ -427,19 +427,6 @@ impl PartialEq for Dictionary { } } -/// Returns true if Lance supports writing this datatype with nulls. -pub fn lance_supports_nulls(datatype: &DataType) -> bool { - matches!( - datatype, - DataType::Utf8 - | DataType::LargeUtf8 - | DataType::Binary - | DataType::List(_) - | DataType::FixedSizeBinary(_) - | DataType::FixedSizeList(_, _) - ) -} - /// Physical storage mode for blob v2 descriptors (one byte, stored in the packed struct column). #[derive(Debug, Clone, Copy, PartialEq, Eq)] #[repr(u8)] diff --git a/rust/lance/src/dataset/fragment.rs b/rust/lance/src/dataset/fragment.rs index 8428cf619b4..d359e84906b 100644 --- a/rust/lance/src/dataset/fragment.rs +++ b/rust/lance/src/dataset/fragment.rs @@ -1590,7 +1590,9 @@ impl FileFragment { let mut updater = self.updater(Some(&[join_column]), None, None).await?; while let Some(batch) = updater.next().await? { - let batch = joiner.collect(batch[join_column].clone()).await?; + let batch = joiner + .collect(&self.dataset, batch[join_column].clone()) + .await?; updater.update(batch).await?; } diff --git a/rust/lance/src/dataset/hash_joiner.rs b/rust/lance/src/dataset/hash_joiner.rs index e9f8c14d9bb..b07eb6c01ad 100644 --- a/rust/lance/src/dataset/hash_joiner.rs +++ b/rust/lance/src/dataset/hash_joiner.rs @@ -5,6 +5,7 @@ use std::sync::Arc; +use crate::{Dataset, Error, Result}; use arrow_array::ArrayRef; use arrow_array::{new_null_array, Array, RecordBatch, RecordBatchReader}; use arrow_row::{OwnedRow, RowConverter, Rows, SortField}; @@ -16,9 +17,6 @@ use lance_core::utils::tokio::get_num_compute_intensive_cpus; use snafu::location; use tokio::task; -use crate::datatypes::lance_supports_nulls; -use crate::{Dataset, Error, Result}; - /// `HashJoiner` does hash join on two datasets. pub struct HashJoiner { index_map: ReadOnlyView, @@ -133,7 +131,11 @@ impl HashJoiner { /// Collecting the data using the index column from left table. /// /// Will run in parallel over columns using all available cores. - pub(super) async fn collect(&self, index_column: ArrayRef) -> Result { + pub(super) async fn collect( + &self, + dataset: &Dataset, + index_column: ArrayRef, + ) -> Result { if index_column.data_type() != &self.index_type { return Err(Error::invalid_input( format!( @@ -180,29 +182,18 @@ impl HashJoiner { async move { let task_result = task::spawn_blocking(move || { let array_refs = arrays.iter().map(|x| x.as_ref()).collect::>(); - interleave(array_refs.as_ref(), indices.as_ref()) - .map_err(|err| Error::invalid_input( - format!("HashJoiner: {}", err), - location!(), - )) + interleave(array_refs.as_ref(), indices.as_ref()).map_err(|err| { + Error::invalid_input(format!("HashJoiner: {}", err), location!()) + }) }) .await; match task_result { Ok(Ok(array)) => { - if array.null_count() > 0 && !lance_supports_nulls(array.data_type()) { - return Err(Error::invalid_input(format!( - "Found rows on LHS that do not match any rows on RHS. Lance would need to write \ - nulls on the RHS, but Lance does not yet support nulls for type {:?}.", - array.data_type() - ), location!())); - } + Self::check_lance_support_null(&array, dataset)?; Ok(array) - }, + } Ok(Err(err)) => Err(err), - Err(err) => Err(Error::io( - format!("HashJoiner: {}", err), - location!(), - )), + Err(err) => Err(Error::io(format!("HashJoiner: {}", err), location!())), } } }) @@ -213,6 +204,27 @@ impl HashJoiner { Ok(RecordBatch::try_new(self.batches[0].schema(), columns)?) } + pub fn check_lance_support_null(array: &ArrayRef, dataset: &Dataset) -> Result<()> { + if array.null_count() > 0 && !dataset.lance_supports_nulls(array.data_type()) { + return Err(Error::invalid_input( + format!( + "Join produced null values for type: {:?}, but storing \ + nulls for this data type is not supported by the \ + dataset's current Lance file format version: {:?}. This \ + can be caused by an explicit null in the new data.", + array.data_type(), + dataset + .manifest() + .data_storage_format + .lance_file_version() + .unwrap() + ), + location!(), + )); + } + Ok(()) + } + /// Collecting the data using the index column from left table, /// invalid join column values in left table will be filled with origin values in left table /// @@ -271,25 +283,7 @@ impl HashJoiner { .await; match task_result { Ok(Ok(array)) => { - if array.null_count() > 0 - && !dataset.lance_supports_nulls(array.data_type()) - { - return Err(Error::invalid_input( - format!( - "Join produced null values for type: {:?}, but storing \ - nulls for this data type is not supported by the \ - dataset's current Lance file format version: {:?}. This \ - can be caused by an explicit null in the new data.", - array.data_type(), - dataset - .manifest() - .data_storage_format - .lance_file_version() - .unwrap() - ), - location!(), - )); - } + Self::check_lance_support_null(&array, dataset)?; Ok(array) } Ok(Err(err)) => Err(err), @@ -311,9 +305,22 @@ impl HashJoiner { mod tests { use super::*; - use arrow_array::{Int32Array, RecordBatchIterator, StringArray, UInt32Array}; use arrow_schema::{DataType, Field, Schema}; + use lance_core::utils::tempfile::TempDir; + + async fn create_dataset() -> Dataset { + let uri = TempDir::default().path_str(); + let schema = Arc::new(Schema::new(vec![ + Field::new("i", DataType::Int32, true), + ])); + let batches = RecordBatchIterator::new(std::iter::empty().map(Ok), schema.clone()); + Dataset::write(batches, &uri, None) + .await + .unwrap(); + + Dataset::open(&uri).await.unwrap() + } #[tokio::test] async fn test_joiner_collect() { @@ -343,6 +350,8 @@ mod tests { )); let joiner = HashJoiner::try_new(batches, "i").await.unwrap(); + let dataset = create_dataset().await; + let indices = Arc::new(Int32Array::from_iter(&[ Some(15), None, @@ -353,7 +362,7 @@ mod tests { Some(22), Some(11111), // not found ])); - let results = joiner.collect(indices).await.unwrap(); + let results = joiner.collect(&dataset, indices).await.unwrap(); assert_eq!( results.column_by_name("s").unwrap().as_ref(), @@ -394,13 +403,15 @@ mod tests { let joiner = HashJoiner::try_new(batches, "i").await.unwrap(); + let dataset = create_dataset().await; + // Wrong type: was Int32, passing UInt32. let indices = Arc::new(UInt32Array::from_iter(&[Some(15)])); - let result = joiner.collect(indices).await; + let result = joiner.collect(&dataset, indices).await; assert!(result.is_err()); assert!(result .unwrap_err() .to_string() .contains("Index column type mismatch: expected Int32, got UInt32")); } -} +} \ No newline at end of file From 390ed31193eddd4e88caa14412a77d888d6cd897 Mon Sep 17 00:00:00 2001 From: YueZhang Date: Thu, 22 Jan 2026 20:14:03 +0800 Subject: [PATCH 2/9] add more test --- python/python/tests/test_dataset.py | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/python/python/tests/test_dataset.py b/python/python/tests/test_dataset.py index 9cf7c824a60..ed0b436b9ba 100644 --- a/python/python/tests/test_dataset.py +++ b/python/python/tests/test_dataset.py @@ -1624,6 +1624,28 @@ def copy_row_id(batch: pa.RecordBatch) -> pa.RecordBatch: ) +def test_merge_columns_float32_with_nulls(tmp_path: Path): + table = pa.table({"a": range(100)}) + base_dir = tmp_path / "merge_columns_float32_with_nulls" + dataset = lance.write_dataset(table, base_dir) + + fragment = dataset.get_fragments()[0] + input_data = [float(i) for i in range(10)] + update = pa.table( + { + "a": range(10), + "b": pa.array(input_data, type=pa.float32()), + } + ) + merged, schema = fragment.merge(update, left_on="a", right_on="a") + merge = lance.LanceOperation.Merge([merged], schema) + dataset = lance.LanceDataset.commit(base_dir, merge, read_version=dataset.version) + + expected_b = pa.array(input_data + [None] * 90, type=pa.float32()) + expected = pa.table({"a": range(100), "b": expected_b}) + assert dataset.to_table() == expected + + def test_merge_separate_dataset(tmp_path: Path): base_ds = lance.write_dataset( pa.table({"a": range(10), "b": range(10)}), tmp_path / "base" From 4c2b2c0d27cf7eadc52fc8f32411ea02360f6e1f Mon Sep 17 00:00:00 2001 From: YueZhang Date: Thu, 22 Jan 2026 20:20:43 +0800 Subject: [PATCH 3/9] fix style --- rust/lance/src/dataset/hash_joiner.rs | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/rust/lance/src/dataset/hash_joiner.rs b/rust/lance/src/dataset/hash_joiner.rs index b07eb6c01ad..7952c41d78f 100644 --- a/rust/lance/src/dataset/hash_joiner.rs +++ b/rust/lance/src/dataset/hash_joiner.rs @@ -311,13 +311,9 @@ mod tests { async fn create_dataset() -> Dataset { let uri = TempDir::default().path_str(); - let schema = Arc::new(Schema::new(vec![ - Field::new("i", DataType::Int32, true), - ])); + let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)])); let batches = RecordBatchIterator::new(std::iter::empty().map(Ok), schema.clone()); - Dataset::write(batches, &uri, None) - .await - .unwrap(); + Dataset::write(batches, &uri, None).await.unwrap(); Dataset::open(&uri).await.unwrap() } @@ -414,4 +410,4 @@ mod tests { .to_string() .contains("Index column type mismatch: expected Int32, got UInt32")); } -} \ No newline at end of file +} From ea99c4dc91d7d8cdc452d5657a642bc220f320cc Mon Sep 17 00:00:00 2001 From: YueZhang Date: Thu, 22 Jan 2026 20:37:27 +0800 Subject: [PATCH 4/9] fix ut --- python/python/tests/test_dataset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/python/tests/test_dataset.py b/python/python/tests/test_dataset.py index ed0b436b9ba..18809e544fd 100644 --- a/python/python/tests/test_dataset.py +++ b/python/python/tests/test_dataset.py @@ -1624,7 +1624,7 @@ def copy_row_id(batch: pa.RecordBatch) -> pa.RecordBatch: ) -def test_merge_columns_float32_with_nulls(tmp_path: Path): +def test_merge_columns_with_nulls(tmp_path: Path): table = pa.table({"a": range(100)}) base_dir = tmp_path / "merge_columns_float32_with_nulls" dataset = lance.write_dataset(table, base_dir) From f25bac7fd9a79412827b3fb52160256b2a4329b8 Mon Sep 17 00:00:00 2001 From: YueZhang Date: Thu, 22 Jan 2026 21:00:08 +0800 Subject: [PATCH 5/9] rename ut --- python/python/tests/test_dataset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/python/tests/test_dataset.py b/python/python/tests/test_dataset.py index 18809e544fd..155d2763c7a 100644 --- a/python/python/tests/test_dataset.py +++ b/python/python/tests/test_dataset.py @@ -1624,7 +1624,7 @@ def copy_row_id(batch: pa.RecordBatch) -> pa.RecordBatch: ) -def test_merge_columns_with_nulls(tmp_path: Path): +def test_add_columns_with_nulls(tmp_path: Path): table = pa.table({"a": range(100)}) base_dir = tmp_path / "merge_columns_float32_with_nulls" dataset = lance.write_dataset(table, base_dir) From 3d9f1a97e4026b4fcf690fede75fb7f42db52e55 Mon Sep 17 00:00:00 2001 From: YueZhang Date: Fri, 23 Jan 2026 13:34:44 +0800 Subject: [PATCH 6/9] fix ut --- python/python/tests/test_dataset.py | 143 +++++++++++++--------------- 1 file changed, 68 insertions(+), 75 deletions(-) diff --git a/python/python/tests/test_dataset.py b/python/python/tests/test_dataset.py index 155d2763c7a..b66558fc1e2 100644 --- a/python/python/tests/test_dataset.py +++ b/python/python/tests/test_dataset.py @@ -491,12 +491,12 @@ def test_tag(tmp_path: Path): # test tag update with pytest.raises( - ValueError, match="Version not found error: version main:3 does not exist" + ValueError, match="Version not found error: version main:3 does not exist" ): ds.tags.update("tag1", 3) with pytest.raises( - ValueError, match="Ref not found error: tag tag3 does not exist" + ValueError, match="Ref not found error: tag tag3 does not exist" ): ds.tags.update("tag3", 1) @@ -802,8 +802,8 @@ def test_limit_offset(tmp_path: Path, data_storage_version: str): dataset.delete("a % 2 = 0") filt_table = table.filter((pa.compute.bit_wise_and(pa.compute.field("a"), 1)) != 0) assert ( - dataset.to_table(offset=10).combine_chunks() - == filt_table.slice(10).combine_chunks() + dataset.to_table(offset=10).combine_chunks() + == filt_table.slice(10).combine_chunks() ) dataset = dataset.checkout_version(full_ds_version) @@ -848,7 +848,7 @@ def test_tilde_paths(tmp_path: Path): table = pa.Table.from_pydict({"a": range(100), "b": range(100)}) with mock.patch.dict( - os.environ, {"HOME": str(tmp_path), "USERPROFILE": str(tmp_path)} + os.environ, {"HOME": str(tmp_path), "USERPROFILE": str(tmp_path)} ): # NOTE: the resolution logic is a bit finicky # link 1 - https://docs.rs/dirs/4.0.0/dirs/fn.home_dir.html @@ -1024,12 +1024,12 @@ def test_count_rows_via_scanner(tmp_path: Path): assert ds.scanner(filter="a < 50", columns=[], with_row_id=True).count_rows() == 50 with pytest.raises( - ValueError, match="should not be called on a plan selecting columns" + ValueError, match="should not be called on a plan selecting columns" ): ds.scanner(filter="a < 50", columns=["a"], with_row_id=True).count_rows() with pytest.raises( - ValueError, match="should not be called on a plan selecting columns" + ValueError, match="should not be called on a plan selecting columns" ): ds.scanner(with_row_id=True).count_rows() @@ -1408,7 +1408,7 @@ def test_strict_overwrite(tmp_path: Path): base_dir, operation, read_version=dataset_v1.version, max_retries=0 ) with pytest.raises( - OSError, match=f"Commit conflict for version {dataset_v1.version + 1}" + OSError, match=f"Commit conflict for version {dataset_v1.version + 1}" ): lance.LanceDataset.commit( base_dir, operation, read_version=dataset_v1.version, max_retries=0 @@ -1624,28 +1624,6 @@ def copy_row_id(batch: pa.RecordBatch) -> pa.RecordBatch: ) -def test_add_columns_with_nulls(tmp_path: Path): - table = pa.table({"a": range(100)}) - base_dir = tmp_path / "merge_columns_float32_with_nulls" - dataset = lance.write_dataset(table, base_dir) - - fragment = dataset.get_fragments()[0] - input_data = [float(i) for i in range(10)] - update = pa.table( - { - "a": range(10), - "b": pa.array(input_data, type=pa.float32()), - } - ) - merged, schema = fragment.merge(update, left_on="a", right_on="a") - merge = lance.LanceOperation.Merge([merged], schema) - dataset = lance.LanceDataset.commit(base_dir, merge, read_version=dataset.version) - - expected_b = pa.array(input_data + [None] * 90, type=pa.float32()) - expected = pa.table({"a": range(100), "b": expected_b}) - assert dataset.to_table() == expected - - def test_merge_separate_dataset(tmp_path: Path): base_ds = lance.write_dataset( pa.table({"a": range(10), "b": range(10)}), tmp_path / "base" @@ -1806,28 +1784,43 @@ def test_load_scanner_from_fragments(tmp_path: Path): assert scanner.to_table().num_rows == 2 * 100 -def test_merge_data(tmp_path: Path): +def test_merge_data_legacy(tmp_path: Path): tab = pa.table({"a": range(100), "b": range(100)}) - lance.write_dataset(tab, tmp_path / "dataset", mode="append") - + lance.write_dataset(tab, tmp_path / "dataset", mode="append", data_storage_version="legacy") dataset = lance.dataset(tmp_path / "dataset") - # rejects partial data for non-nullable types new_tab = pa.table({"a": range(40), "c": range(40)}) - # TODO: this should be ValueError with pytest.raises( - OSError, match=".+Lance does not yet support nulls for type Int64." + OSError, match=r"Join produced null values for type: Int64" ): dataset.merge(new_tab, "a") + +def test_merge_data(tmp_path: Path): + tab = pa.table({"a": range(100)}) + lance.write_dataset(tab, tmp_path / "dataset", mode="append") + + dataset = lance.dataset(tmp_path / "dataset") + + # accepts partial data for nullable types + new_tab = pa.table({"a": range(40), "b": range(40)}) + dataset.merge(new_tab, "a") + assert dataset.version == 2 + assert dataset.to_table() == pa.table( + { + "a": range(100), + "b": pa.array(list(range(40)) + [None] * 60), + } + ) + # accepts a full merge new_tab = pa.table({"a": range(100), "c": range(100)}) dataset.merge(new_tab, "a") - assert dataset.version == 2 + assert dataset.version == 3 assert dataset.to_table() == pa.table( { "a": range(100), - "b": range(100), + "b": pa.array(list(range(40)) + [None] * 60), "c": range(100), } ) @@ -1835,11 +1828,11 @@ def test_merge_data(tmp_path: Path): # accepts a partial for string new_tab = pa.table({"a2": range(5), "d": ["a", "b", "c", "d", "e"]}) dataset.merge(new_tab, left_on="a", right_on="a2") - assert dataset.version == 3 + assert dataset.version == 4 expected = pa.table( { "a": range(100), - "b": range(100), + "b": pa.array(list(range(40)) + [None] * 60), "c": range(100), "d": ["a", "b", "c", "d", "e"] + [None] * 95, } @@ -1910,10 +1903,10 @@ def test_delete_data(tmp_path: Path): def check_merge_stats(merge_dict, expected): assert ( - merge_dict["num_inserted_rows"], - merge_dict["num_updated_rows"], - merge_dict["num_deleted_rows"], - ) == expected + merge_dict["num_inserted_rows"], + merge_dict["num_updated_rows"], + merge_dict["num_deleted_rows"], + ) == expected def test_merge_insert(tmp_path: Path): @@ -2069,7 +2062,7 @@ def test_merge_insert_subcols(tmp_path: Path): assert len(fragments[0].data_files()) == 2 assert ( - fragments[0].data_files()[0].path == original_fragments[0].data_files()[0].path + fragments[0].data_files()[0].path == original_fragments[0].data_files()[0].path ) assert len(fragments[1].data_files()) == 1 assert str(fragments[1].data_files()[0]) == str( @@ -2157,10 +2150,10 @@ def test_flat_vector_search_with_delete(tmp_path: Path): dataset = lance.write_dataset(table, tmp_path / "dataset", mode="create") dataset.delete("id = 0") assert ( - dataset.scanner(nearest={"column": "vector", "k": 10, "q": [1.0, 1.0]}) - .to_table() - .num_rows - == 9 + dataset.scanner(nearest={"column": "vector", "k": 10, "q": [1.0, 1.0]}) + .to_table() + .num_rows + == 9 ) @@ -2742,12 +2735,12 @@ def test_add_null_columns_with_conflict_names(tmp_path: Path): assert len(fragments[0].data_files()) == 1 with pytest.raises( - Exception, match=".*Type conflicts between id\\(Int64\\) and id\\(Float32\\).*" + Exception, match=".*Type conflicts between id\\(Int64\\) and id\\(Float32\\).*" ): ds.add_columns(pa.field("id", pa.float32())) with pytest.raises( - Exception, match=".*Type conflicts between id\\(Int64\\) and id\\(Float32\\).*" + Exception, match=".*Type conflicts between id\\(Int64\\) and id\\(Float32\\).*" ): ds.add_columns([pa.field("id", pa.float32()), pa.field("good", pa.int32())]) @@ -3462,17 +3455,17 @@ def test_scan_deleted_rows(tmp_path: Path): ) assert ( - ds.scanner(with_row_id=True, include_deleted_rows=True, filter="a < 32") - .to_table() - .num_rows - == 7 + ds.scanner(with_row_id=True, include_deleted_rows=True, filter="a < 32") + .to_table() + .num_rows + == 7 ) assert ( - ds.scanner(include_deleted_rows=True, with_row_id=True, filter="b < 30") - .to_table() - .num_rows - == 5 + ds.scanner(include_deleted_rows=True, with_row_id=True, filter="b < 30") + .to_table() + .num_rows + == 5 ) with pytest.raises(ValueError, match="with_row_id is false"): @@ -4023,8 +4016,8 @@ def test_late_materialization_param(tmp_path: Path): assert list(dataset.to_batches(filter=filt, late_materialization=False)) == expected assert list(dataset.to_batches(filter=filt, late_materialization=True)) == expected assert ( - list(dataset.to_batches(filter=filt, late_materialization=["values"])) - == expected + list(dataset.to_batches(filter=filt, late_materialization=["values"])) + == expected ) @@ -4034,10 +4027,10 @@ def test_late_materialization_batch_size(tmp_path: Path): table, tmp_path, data_storage_version="stable", max_rows_per_file=10000 ) for batch in dataset.to_batches( - columns=["values"], - filter="filter % 2 == 0", - batch_size=32, - late_materialization=True, + columns=["values"], + filter="filter % 2 == 0", + batch_size=32, + late_materialization=True, ): assert batch.num_rows == 32 @@ -4406,7 +4399,7 @@ def test_file_reader_options(tmp_path: Path): dataset_both = lance.dataset( tmp_path / "test", read_params={"cache_repetition_index": True, "validate_on_decode": False}, - ) + ) result4 = dataset_both.to_table() assert result4.num_rows == 10000 @@ -4414,7 +4407,7 @@ def test_file_reader_options(tmp_path: Path): dataset_inherit = lance.dataset( tmp_path / "test", read_params={"cache_repetition_index": True, "validate_on_decode": True}, - ) + ) scanner = dataset_inherit.scanner() result5 = scanner.to_table() assert result5.num_rows == 10000 @@ -4511,8 +4504,8 @@ def test_commit_message_and_get_properties(tmp_path): transactions = dataset.get_transactions() assert len(transactions) == 1 assert ( - transactions[0].transaction_properties.get(LANCE_COMMIT_MESSAGE_KEY) - == "first commit" + transactions[0].transaction_properties.get(LANCE_COMMIT_MESSAGE_KEY) + == "first commit" ) # 2. Test case: Commit without a message lance.write_dataset(table, tmp_path, mode="append") @@ -4523,13 +4516,13 @@ def test_commit_message_and_get_properties(tmp_path): # The latest transaction has no message, # so the key should be missing or properties is None assert ( - transactions[0].transaction_properties == {} - or LANCE_COMMIT_MESSAGE_KEY not in transactions[0].transaction_properties + transactions[0].transaction_properties == {} + or LANCE_COMMIT_MESSAGE_KEY not in transactions[0].transaction_properties ) # The first transaction should still have the message assert ( - transactions[1].transaction_properties.get(LANCE_COMMIT_MESSAGE_KEY) - == "first commit" + transactions[1].transaction_properties.get(LANCE_COMMIT_MESSAGE_KEY) + == "first commit" ) # 3. Test case: Transaction with no properties at all # A delete operation creates a new version that may have no properties. @@ -4551,8 +4544,8 @@ def test_commit_message_and_get_properties(tmp_path): transactions = dataset.get_transactions() assert len(transactions) == 4 assert ( - transactions[0].transaction_properties.get(LANCE_COMMIT_MESSAGE_KEY) - == "Use Dataset.commit" + transactions[0].transaction_properties.get(LANCE_COMMIT_MESSAGE_KEY) + == "Use Dataset.commit" ) From 25f8211bc3b3eba8e5a153257ea231d97c79879b Mon Sep 17 00:00:00 2001 From: YueZhang Date: Fri, 23 Jan 2026 13:38:18 +0800 Subject: [PATCH 7/9] fix ut --- python/python/tests/test_dataset.py | 90 +++++++++++++++-------------- 1 file changed, 46 insertions(+), 44 deletions(-) diff --git a/python/python/tests/test_dataset.py b/python/python/tests/test_dataset.py index b66558fc1e2..63154ee72f3 100644 --- a/python/python/tests/test_dataset.py +++ b/python/python/tests/test_dataset.py @@ -491,12 +491,12 @@ def test_tag(tmp_path: Path): # test tag update with pytest.raises( - ValueError, match="Version not found error: version main:3 does not exist" + ValueError, match="Version not found error: version main:3 does not exist" ): ds.tags.update("tag1", 3) with pytest.raises( - ValueError, match="Ref not found error: tag tag3 does not exist" + ValueError, match="Ref not found error: tag tag3 does not exist" ): ds.tags.update("tag3", 1) @@ -802,8 +802,8 @@ def test_limit_offset(tmp_path: Path, data_storage_version: str): dataset.delete("a % 2 = 0") filt_table = table.filter((pa.compute.bit_wise_and(pa.compute.field("a"), 1)) != 0) assert ( - dataset.to_table(offset=10).combine_chunks() - == filt_table.slice(10).combine_chunks() + dataset.to_table(offset=10).combine_chunks() + == filt_table.slice(10).combine_chunks() ) dataset = dataset.checkout_version(full_ds_version) @@ -848,7 +848,7 @@ def test_tilde_paths(tmp_path: Path): table = pa.Table.from_pydict({"a": range(100), "b": range(100)}) with mock.patch.dict( - os.environ, {"HOME": str(tmp_path), "USERPROFILE": str(tmp_path)} + os.environ, {"HOME": str(tmp_path), "USERPROFILE": str(tmp_path)} ): # NOTE: the resolution logic is a bit finicky # link 1 - https://docs.rs/dirs/4.0.0/dirs/fn.home_dir.html @@ -1024,12 +1024,12 @@ def test_count_rows_via_scanner(tmp_path: Path): assert ds.scanner(filter="a < 50", columns=[], with_row_id=True).count_rows() == 50 with pytest.raises( - ValueError, match="should not be called on a plan selecting columns" + ValueError, match="should not be called on a plan selecting columns" ): ds.scanner(filter="a < 50", columns=["a"], with_row_id=True).count_rows() with pytest.raises( - ValueError, match="should not be called on a plan selecting columns" + ValueError, match="should not be called on a plan selecting columns" ): ds.scanner(with_row_id=True).count_rows() @@ -1408,7 +1408,7 @@ def test_strict_overwrite(tmp_path: Path): base_dir, operation, read_version=dataset_v1.version, max_retries=0 ) with pytest.raises( - OSError, match=f"Commit conflict for version {dataset_v1.version + 1}" + OSError, match=f"Commit conflict for version {dataset_v1.version + 1}" ): lance.LanceDataset.commit( base_dir, operation, read_version=dataset_v1.version, max_retries=0 @@ -1786,8 +1786,10 @@ def test_load_scanner_from_fragments(tmp_path: Path): def test_merge_data_legacy(tmp_path: Path): tab = pa.table({"a": range(100), "b": range(100)}) - lance.write_dataset(tab, tmp_path / "dataset", mode="append", data_storage_version="legacy") + lance.write_dataset(tab, tmp_path / "dataset", mode="append") + dataset = lance.dataset(tmp_path / "dataset") + # rejects partial data for non-nullable types new_tab = pa.table({"a": range(40), "c": range(40)}) with pytest.raises( @@ -1903,10 +1905,10 @@ def test_delete_data(tmp_path: Path): def check_merge_stats(merge_dict, expected): assert ( - merge_dict["num_inserted_rows"], - merge_dict["num_updated_rows"], - merge_dict["num_deleted_rows"], - ) == expected + merge_dict["num_inserted_rows"], + merge_dict["num_updated_rows"], + merge_dict["num_deleted_rows"], + ) == expected def test_merge_insert(tmp_path: Path): @@ -2062,7 +2064,7 @@ def test_merge_insert_subcols(tmp_path: Path): assert len(fragments[0].data_files()) == 2 assert ( - fragments[0].data_files()[0].path == original_fragments[0].data_files()[0].path + fragments[0].data_files()[0].path == original_fragments[0].data_files()[0].path ) assert len(fragments[1].data_files()) == 1 assert str(fragments[1].data_files()[0]) == str( @@ -2150,10 +2152,10 @@ def test_flat_vector_search_with_delete(tmp_path: Path): dataset = lance.write_dataset(table, tmp_path / "dataset", mode="create") dataset.delete("id = 0") assert ( - dataset.scanner(nearest={"column": "vector", "k": 10, "q": [1.0, 1.0]}) - .to_table() - .num_rows - == 9 + dataset.scanner(nearest={"column": "vector", "k": 10, "q": [1.0, 1.0]}) + .to_table() + .num_rows + == 9 ) @@ -2735,12 +2737,12 @@ def test_add_null_columns_with_conflict_names(tmp_path: Path): assert len(fragments[0].data_files()) == 1 with pytest.raises( - Exception, match=".*Type conflicts between id\\(Int64\\) and id\\(Float32\\).*" + Exception, match=".*Type conflicts between id\\(Int64\\) and id\\(Float32\\).*" ): ds.add_columns(pa.field("id", pa.float32())) with pytest.raises( - Exception, match=".*Type conflicts between id\\(Int64\\) and id\\(Float32\\).*" + Exception, match=".*Type conflicts between id\\(Int64\\) and id\\(Float32\\).*" ): ds.add_columns([pa.field("id", pa.float32()), pa.field("good", pa.int32())]) @@ -3455,17 +3457,17 @@ def test_scan_deleted_rows(tmp_path: Path): ) assert ( - ds.scanner(with_row_id=True, include_deleted_rows=True, filter="a < 32") - .to_table() - .num_rows - == 7 + ds.scanner(with_row_id=True, include_deleted_rows=True, filter="a < 32") + .to_table() + .num_rows + == 7 ) assert ( - ds.scanner(include_deleted_rows=True, with_row_id=True, filter="b < 30") - .to_table() - .num_rows - == 5 + ds.scanner(include_deleted_rows=True, with_row_id=True, filter="b < 30") + .to_table() + .num_rows + == 5 ) with pytest.raises(ValueError, match="with_row_id is false"): @@ -4016,8 +4018,8 @@ def test_late_materialization_param(tmp_path: Path): assert list(dataset.to_batches(filter=filt, late_materialization=False)) == expected assert list(dataset.to_batches(filter=filt, late_materialization=True)) == expected assert ( - list(dataset.to_batches(filter=filt, late_materialization=["values"])) - == expected + list(dataset.to_batches(filter=filt, late_materialization=["values"])) + == expected ) @@ -4027,10 +4029,10 @@ def test_late_materialization_batch_size(tmp_path: Path): table, tmp_path, data_storage_version="stable", max_rows_per_file=10000 ) for batch in dataset.to_batches( - columns=["values"], - filter="filter % 2 == 0", - batch_size=32, - late_materialization=True, + columns=["values"], + filter="filter % 2 == 0", + batch_size=32, + late_materialization=True, ): assert batch.num_rows == 32 @@ -4399,7 +4401,7 @@ def test_file_reader_options(tmp_path: Path): dataset_both = lance.dataset( tmp_path / "test", read_params={"cache_repetition_index": True, "validate_on_decode": False}, - ) + ) result4 = dataset_both.to_table() assert result4.num_rows == 10000 @@ -4407,7 +4409,7 @@ def test_file_reader_options(tmp_path: Path): dataset_inherit = lance.dataset( tmp_path / "test", read_params={"cache_repetition_index": True, "validate_on_decode": True}, - ) + ) scanner = dataset_inherit.scanner() result5 = scanner.to_table() assert result5.num_rows == 10000 @@ -4504,8 +4506,8 @@ def test_commit_message_and_get_properties(tmp_path): transactions = dataset.get_transactions() assert len(transactions) == 1 assert ( - transactions[0].transaction_properties.get(LANCE_COMMIT_MESSAGE_KEY) - == "first commit" + transactions[0].transaction_properties.get(LANCE_COMMIT_MESSAGE_KEY) + == "first commit" ) # 2. Test case: Commit without a message lance.write_dataset(table, tmp_path, mode="append") @@ -4516,13 +4518,13 @@ def test_commit_message_and_get_properties(tmp_path): # The latest transaction has no message, # so the key should be missing or properties is None assert ( - transactions[0].transaction_properties == {} - or LANCE_COMMIT_MESSAGE_KEY not in transactions[0].transaction_properties + transactions[0].transaction_properties == {} + or LANCE_COMMIT_MESSAGE_KEY not in transactions[0].transaction_properties ) # The first transaction should still have the message assert ( - transactions[1].transaction_properties.get(LANCE_COMMIT_MESSAGE_KEY) - == "first commit" + transactions[1].transaction_properties.get(LANCE_COMMIT_MESSAGE_KEY) + == "first commit" ) # 3. Test case: Transaction with no properties at all # A delete operation creates a new version that may have no properties. @@ -4544,8 +4546,8 @@ def test_commit_message_and_get_properties(tmp_path): transactions = dataset.get_transactions() assert len(transactions) == 4 assert ( - transactions[0].transaction_properties.get(LANCE_COMMIT_MESSAGE_KEY) - == "Use Dataset.commit" + transactions[0].transaction_properties.get(LANCE_COMMIT_MESSAGE_KEY) + == "Use Dataset.commit" ) From 6c527d1bcb4bcd3ae3033db2f62be3338f4215bd Mon Sep 17 00:00:00 2001 From: YueZhang Date: Fri, 23 Jan 2026 14:08:10 +0800 Subject: [PATCH 8/9] fix ut --- python/python/tests/test_dataset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/python/tests/test_dataset.py b/python/python/tests/test_dataset.py index 63154ee72f3..eb2e165e045 100644 --- a/python/python/tests/test_dataset.py +++ b/python/python/tests/test_dataset.py @@ -1786,7 +1786,7 @@ def test_load_scanner_from_fragments(tmp_path: Path): def test_merge_data_legacy(tmp_path: Path): tab = pa.table({"a": range(100), "b": range(100)}) - lance.write_dataset(tab, tmp_path / "dataset", mode="append") + lance.write_dataset(tab, tmp_path / "dataset", mode="append", data_storage_version="legacy") dataset = lance.dataset(tmp_path / "dataset") From e72cfc319f7cf1511ff971a922fc201efe4e75f3 Mon Sep 17 00:00:00 2001 From: YueZhang Date: Fri, 23 Jan 2026 14:22:08 +0800 Subject: [PATCH 9/9] fix ut --- python/python/tests/test_dataset.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/python/python/tests/test_dataset.py b/python/python/tests/test_dataset.py index eb2e165e045..1d7e7e2ef26 100644 --- a/python/python/tests/test_dataset.py +++ b/python/python/tests/test_dataset.py @@ -1786,15 +1786,15 @@ def test_load_scanner_from_fragments(tmp_path: Path): def test_merge_data_legacy(tmp_path: Path): tab = pa.table({"a": range(100), "b": range(100)}) - lance.write_dataset(tab, tmp_path / "dataset", mode="append", data_storage_version="legacy") + lance.write_dataset( + tab, tmp_path / "dataset", mode="append", data_storage_version="legacy" + ) dataset = lance.dataset(tmp_path / "dataset") # rejects partial data for non-nullable types new_tab = pa.table({"a": range(40), "c": range(40)}) - with pytest.raises( - OSError, match=r"Join produced null values for type: Int64" - ): + with pytest.raises(OSError, match=r"Join produced null values for type: Int64"): dataset.merge(new_tab, "a")