From 0a1e00cb4e9d4330aa924bd71c3e0ffb235979e6 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Tue, 10 Feb 2026 10:55:49 -0800 Subject: [PATCH 1/4] fix: don't drop field metadata on merge insert path (#5927) There are various ways to write data and several of them are failing for JSON data at the moment because it requires a conversion from logical to physical representation. I'd like to rework this more generally but I want to get the current implementation working first. This fixes one of the merge_insert paths which is currently failing because the field metadata is lost as part of the operation (and so the data appears to be a normal string column and is not converted) --- python/python/tests/test_json.py | 205 ++++++++++++++++-- .../dataset/write/merge_insert/exec/write.rs | 6 +- 2 files changed, 192 insertions(+), 19 deletions(-) diff --git a/python/python/tests/test_json.py b/python/python/tests/test_json.py index d68e0ca539a..44dd22be5a9 100644 --- a/python/python/tests/test_json.py +++ b/python/python/tests/test_json.py @@ -4,11 +4,21 @@ import json import tempfile from pathlib import Path +from typing import Union import lance import pyarrow as pa +def check_json_type(ds: Union[lance.LanceDataset, pa.Table], col_name: str): + # TODO: In the future it should be possible to verify + # the logical type of a column. + + schema = ds.schema + field = schema.field(col_name) + assert field.type == pa.json_() + + def test_json_basic_write_read(): """Test basic JSON type write and read functionality.""" @@ -44,23 +54,13 @@ def test_json_basic_write_read(): logical_schema = dataset.schema assert len(logical_schema) == 2 assert logical_schema.field("id").type == pa.int32() - logical_field = logical_schema.field("data") - assert ( - str(logical_field.type) == "extension" - or logical_field.type == pa.utf8() - ) + check_json_type(dataset, "data") # Read data back result_table = dataset.to_table() # Check that data is returned as Arrow JSON for Python - result_field = result_table.schema.field("data") - # PyArrow extension types print as extension but - # the storage type is utf8 - assert ( - str(result_field.type) == "extension" - or result_field.type == pa.utf8() - ) + check_json_type(result_table, "data") # Verify data assert result_table.num_rows == 5 @@ -467,8 +467,7 @@ def test_json_filter_append_missing_json_cast(tmp_path: Path): lance.write_dataset(initial_table, dataset_path) dataset = lance.dataset(dataset_path) schema = dataset.schema - field = schema.field("article_metadata") - assert str(field.type) == "extension" or field.type == pa.utf8() + check_json_type(dataset, "article_metadata") append_table = pa.table( { @@ -513,6 +512,68 @@ def test_json_filter_append_missing_json_cast(tmp_path: Path): ] +def test_json_with_compaction(tmp_path: Path): + """Test that JSON data survives compaction across fragments.""" + + dataset_path = tmp_path / "json_compaction.lance" + + # Write first fragment + table1 = pa.table( + { + "id": pa.array([1, 2, 3], type=pa.int32()), + "data": pa.array( + [ + json.dumps({"name": "Alice", "score": 10}), + json.dumps({"name": "Bob", "score": 20}), + json.dumps({"name": "Charlie", "score": 30}), + ], + type=pa.json_(), + ), + } + ) + lance.write_dataset(table1, dataset_path) + + # Write second fragment + table2 = pa.table( + { + "id": pa.array([4, 5], type=pa.int32()), + "data": pa.array( + [ + json.dumps({"name": "David", "score": 40}), + json.dumps({"name": "Eve", "score": 50}), + ], + type=pa.json_(), + ), + } + ) + lance.write_dataset(table2, dataset_path, mode="append") + + dataset = lance.dataset(dataset_path) + assert len(dataset.get_fragments()) == 2 + + # Run compaction + dataset.optimize.compact_files() + dataset = lance.dataset(dataset_path) + assert len(dataset.get_fragments()) == 1 + + # Verify data is intact + result = dataset.to_table() + assert result.num_rows == 5 + assert result.column("id").to_pylist() == [1, 2, 3, 4, 5] + + # Verify JSON type is preserved + check_json_type(dataset, "data") + + # Verify JSON functions still work after compaction + result = dataset.to_table(filter="json_get_string(data, 'name') = 'Alice'") + assert result.num_rows == 1 + assert result["id"][0].as_py() == 1 + + result = dataset.to_table(filter="json_get_int(data, 'score') > 25") + assert result.num_rows == 3 + assert result["id"].to_pylist() == [3, 4, 5] + + def test_json_limit_offset_batch_transfer_preserves_extension_metadata(tmp_path: Path): """Ensure JSON extension metadata survives limit/offset scans. @@ -567,3 +628,119 @@ def test_json_limit_offset_batch_transfer_preserves_extension_metadata(tmp_path: # Ensure JSON functions still recognize the column as JSON. assert dest.to_table(filter="json_get(meta, 'i') IS NOT NULL").num_rows == num_rows + + +def test_json_append(tmp_path: Path): + """Test appending JSON data to an existing dataset.""" + + dataset_path = tmp_path / "json_append.lance" + + # Write initial data + table1 = pa.table( + { + "id": pa.array([1, 2], type=pa.int32()), + "data": pa.array( + [ + json.dumps({"color": "red", "count": 1}), + json.dumps({"color": "blue", "count": 2}), + ], + type=pa.json_(), + ), + } + ) + lance.write_dataset(table1, dataset_path) + + # Append more data + table2 = pa.table( + { + "id": pa.array([3, 4, 5], type=pa.int32()), + "data": pa.array( + [ + json.dumps({"color": "green", "count": 3}), + json.dumps({"color": "yellow", "count": 4}), + None, + ], + type=pa.json_(), + ), + } + ) + lance.write_dataset(table2, dataset_path, mode="append") + + dataset = lance.dataset(dataset_path) + assert dataset.count_rows() == 5 + + # Verify JSON type is preserved + check_json_type(dataset, "data") + + # Verify all data is readable + result = dataset.to_table() + assert result.column("id").to_pylist() == [1, 2, 3, 4, 5] + + # Verify null handling + data_col = result.column("data") + assert data_col.null_count == 1 + assert data_col.is_null().to_pylist() == [False, False, False, False, True] + + # Verify JSON functions work across both fragments + result = dataset.to_table(filter="json_get_string(data, 'color') = 'green'") + assert result.num_rows == 1 + assert result["id"][0].as_py() == 3 + + result = dataset.to_table(filter="json_get_int(data, 'count') >= 2") + assert result.num_rows == 3 + assert result["id"].to_pylist() == [2, 3, 4] + + +def test_json_merge_insert(tmp_path: Path): + """Test merge_insert with JSON data.""" + + dataset_path = tmp_path / "json_merge_insert.lance" + + # Create initial dataset + table = pa.table( + { + "id": pa.array([1, 2, 3], type=pa.int32()), + "data": pa.array( + [ + json.dumps({"name": "Alice", "score": 10}), + json.dumps({"name": "Bob", "score": 20}), + json.dumps({"name": "Charlie", "score": 30}), + ], + type=pa.json_(), + ), + } + ) + lance.write_dataset(table, dataset_path) + + # Merge insert: update id=2, insert id=4 + new_data = pa.table( + { + "id": pa.array([2, 4], type=pa.int32()), + "data": pa.array( + [ + json.dumps({"name": "Bob", "score": 99}), + json.dumps({"name": "David", "score": 40}), + ], + type=pa.json_(), + ), + } + ) + + dataset = lance.dataset(dataset_path) + dataset.merge_insert( + "id" + ).when_matched_update_all().when_not_matched_insert_all().execute(new_data) + dataset = lance.dataset(dataset_path) + + # Verify row count + assert dataset.count_rows() == 4 + + # Verify JSON type preserved + check_json_type(dataset, "data") + + # Verify data is readable + result = dataset.to_table() + assert sorted(result.column("id").to_pylist()) == [1, 2, 3, 4] + + result = dataset.to_table(filter="json_get_int(data, 'score') >= 35") + assert result.num_rows == 2 diff --git a/rust/lance/src/dataset/write/merge_insert/exec/write.rs b/rust/lance/src/dataset/write/merge_insert/exec/write.rs index 87714e6d46d..45b915fd353 100644 --- a/rust/lance/src/dataset/write/merge_insert/exec/write.rs +++ b/rust/lance/src/dataset/write/merge_insert/exec/write.rs @@ -444,11 +444,7 @@ impl FullSchemaMergeInsertExec { .iter() .map(|&idx| { let field = input_schema.field(idx); - Arc::new(arrow_schema::Field::new( - field.name(), - field.data_type().clone(), - field.is_nullable(), - )) + Arc::new(field.clone()) }) .collect(); let output_schema = Arc::new(Schema::new(output_fields)); From e5dad6a137d37d54dc235c61d2711b85ad67aca7 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Tue, 10 Feb 2026 19:17:59 -0800 Subject: [PATCH 2/4] fix: apply SchemaAdapter in Updater (#5928) This will ensure JSON columns are properly converted in paths like add_columns --- python/python/tests/test_json.py | 41 +++++++++++++++++++++++++++++++ rust/lance/src/dataset/updater.rs | 16 ++++++++++++ rust/lance/src/dataset/utils.rs | 8 ++++++ 3 files changed, 65 insertions(+) diff --git a/python/python/tests/test_json.py b/python/python/tests/test_json.py index 44dd22be5a9..0cbc918cc18 100644 --- a/python/python/tests/test_json.py +++ b/python/python/tests/test_json.py @@ -691,6 +691,47 @@ def test_json_append(tmp_path: Path): assert result["id"].to_pylist() == [2, 3, 4] +def test_json_add_columns(tmp_path: Path): + """Test adding a JSON column to an existing dataset via add_columns.""" + + dataset_path = tmp_path / "json_add_col.lance" + + # Create a dataset without a JSON column + table = pa.table( + { + "id": pa.array([1, 2, 3], type=pa.int32()), + "name": pa.array(["Alice", "Bob", "Charlie"], type=pa.string()), + } + ) + dataset = lance.write_dataset(table, dataset_path) + + # Add a JSON column using a record batch reader + names = table.column("name").to_pylist() + json_values = [json.dumps({"greeting": f"hello {n}"}) for n in names] + new_col = pa.record_batch([pa.array(json_values, type=pa.json_())], ["metadata"]) + reader_schema = pa.schema([pa.field("metadata", pa.json_())]) + + dataset.add_columns(iter([new_col]), reader_schema=reader_schema) + dataset = lance.dataset(dataset_path) + + # Verify the new column exists and has the right type + assert dataset.schema.names == ["id", "name", "metadata"] + check_json_type(dataset, "metadata") + + # Verify data round-trips + result = dataset.to_table() + assert result.num_rows == 3 + metadata_values = result.column("metadata").to_pylist() + for name, val in zip(names, metadata_values): + assert json.loads(val) == {"greeting": f"hello {name}"} + + result = dataset.to_table( + filter="json_get_string(metadata, 'greeting') = 'hello Alice'" + ) + assert result.num_rows == 1 + assert result["id"][0].as_py() == 1 + + def test_json_merge_insert(tmp_path: Path): """Test merge_insert with JSON data.""" diff --git a/rust/lance/src/dataset/updater.rs b/rust/lance/src/dataset/updater.rs index 7197f877943..bdcbc73cbfe 100644 --- a/rust/lance/src/dataset/updater.rs +++ b/rust/lance/src/dataset/updater.rs @@ -14,6 +14,7 @@ use super::fragment::FragmentReader; use super::scanner::get_default_batch_size; use super::write::{open_writer, GenericWriter}; use super::Dataset; +use crate::dataset::utils::SchemaAdapter; use crate::dataset::FileFragment; /// Update or insert a new column. @@ -43,6 +44,9 @@ pub struct Updater { /// The schema the new files will be written in. This only contains new columns. write_schema: Option, + /// The adapter to convert the logical data to physical data. + schema_adapter: Option, + finished: bool, deletion_restorer: DeletionRestorer, @@ -89,6 +93,9 @@ impl Updater { writer: None, write_schema, final_schema, + // The schema adapter needs the data schema, not the logical schema, so it can't be + // created until after the first batch is read. + schema_adapter: None, finished: false, deletion_restorer: DeletionRestorer::new(deletion_vector, legacy_batch_size), }) @@ -196,6 +203,15 @@ impl Updater { ); } + let schema_adapter = if let Some(schema_adapter) = self.schema_adapter.as_ref() { + schema_adapter + } else { + self.schema_adapter = Some(SchemaAdapter::new(batch.schema())); + self.schema_adapter.as_ref().unwrap() + }; + + let batch = schema_adapter.to_physical_batch(batch)?; + let writer = self.writer.as_mut().unwrap(); writer.write(&[batch]).await?; diff --git a/rust/lance/src/dataset/utils.rs b/rust/lance/src/dataset/utils.rs index 56792a9317d..5a459e3032e 100644 --- a/rust/lance/src/dataset/utils.rs +++ b/rust/lance/src/dataset/utils.rs @@ -163,6 +163,14 @@ impl SchemaAdapter { schema.fields().iter().any(|field| is_json_field(field)) } + pub fn to_physical_batch(&self, batch: RecordBatch) -> Result { + if self.requires_physical_conversion() { + Ok(convert_json_columns(&batch)?) + } else { + Ok(batch) + } + } + /// Convert a logical stream into a physical stream. pub fn to_physical_stream( &self, From 5fe9a696c70b6ee2c9e4988ed24fbd69000d5d93 Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Wed, 11 Feb 2026 14:12:11 -0800 Subject: [PATCH 3/4] fix: split index_statistics to reduce rustc query depth (#5894) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Cherry-pick from main to fix CI recursion limit error. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- .../lance-namespace-impls/src/dir/manifest.rs | 19 +- rust/lance/src/index.rs | 373 ++++++++++-------- 2 files changed, 223 insertions(+), 169 deletions(-) diff --git a/rust/lance-namespace-impls/src/dir/manifest.rs b/rust/lance-namespace-impls/src/dir/manifest.rs index 49d19712e26..af0e4d9bb4b 100644 --- a/rust/lance-namespace-impls/src/dir/manifest.rs +++ b/rust/lance-namespace-impls/src/dir/manifest.rs @@ -11,7 +11,7 @@ use arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; use arrow_ipc::reader::StreamReader; use async_trait::async_trait; use bytes::Bytes; -use futures::stream::StreamExt; +use futures::{stream::StreamExt, FutureExt}; use lance::dataset::optimize::{compact_files, CompactionOptions}; use lance::dataset::{builder::DatasetBuilder, WriteParams}; use lance::session::Session; @@ -1081,7 +1081,7 @@ impl LanceNamespace for ManifestNamespace { } let object_id = Self::str_object_id(table_id); - let table_info = self.query_manifest_for_table(&object_id).await?; + let table_info = self.query_manifest_for_table(&object_id).boxed().await?; // Extract table name and namespace from table_id let table_name = table_id.last().cloned().unwrap_or_default(); @@ -1298,12 +1298,12 @@ impl LanceNamespace for ManifestNamespace { let object_id = Self::build_object_id(&namespace, &table_name); // Query manifest for table location - let table_info = self.query_manifest_for_table(&object_id).await?; + let table_info = self.query_manifest_for_table(&object_id).boxed().await?; match table_info { Some(info) => { // Delete from manifest first - self.delete_from_manifest(&object_id).await?; + self.delete_from_manifest(&object_id).boxed().await?; // Delete physical data directory using the dir_name from manifest let table_path = self.base_path.child(info.location.as_str()); @@ -1312,6 +1312,7 @@ impl LanceNamespace for ManifestNamespace { // Remove the table directory self.object_store .remove_dir_all(table_path) + .boxed() .await .map_err(|e| Error::Namespace { source: format!("Failed to delete table directory: {}", e).into(), @@ -1489,7 +1490,7 @@ impl LanceNamespace for ManifestNamespace { let object_id = namespace_id.join(DELIMITER); // Check if namespace exists - if !self.manifest_contains_object(&object_id).await? { + if !self.manifest_contains_object(&object_id).boxed().await? { return Err(Error::Namespace { source: format!("Namespace '{}' not found", object_id).into(), location: location!(), @@ -1499,7 +1500,7 @@ impl LanceNamespace for ManifestNamespace { // Check for child namespaces let prefix = format!("{}{}", object_id, DELIMITER); let filter = format!("starts_with(object_id, '{}')", prefix); - let mut scanner = self.manifest_scanner().await?; + let mut scanner = self.manifest_scanner().boxed().await?; scanner.filter(&filter).map_err(|e| Error::IO { source: box_error(std::io::Error::other(format!("Failed to filter: {}", e))), location: location!(), @@ -1509,7 +1510,7 @@ impl LanceNamespace for ManifestNamespace { location: location!(), })?; scanner.with_row_id(); - let count = scanner.count_rows().await.map_err(|e| Error::IO { + let count = scanner.count_rows().boxed().await.map_err(|e| Error::IO { source: box_error(std::io::Error::other(format!( "Failed to count rows: {}", e @@ -1528,7 +1529,7 @@ impl LanceNamespace for ManifestNamespace { }); } - self.delete_from_manifest(&object_id).await?; + self.delete_from_manifest(&object_id).boxed().await?; Ok(DropNamespaceResponse::default()) } @@ -1863,7 +1864,7 @@ impl LanceNamespace for ManifestNamespace { let table_uri = match table_info { Some(info) => { // Delete from manifest only (leave physical data intact) - self.delete_from_manifest(&object_id).await?; + self.delete_from_manifest(&object_id).boxed().await?; Self::construct_full_uri(&self.root, &info.location)? } None => { diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index a2b02dd5cf4..d74829d15f4 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -11,7 +11,7 @@ use arrow_schema::{DataType, Schema}; use async_trait::async_trait; use datafusion::execution::SendableRecordBatchStream; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; -use futures::stream; +use futures::{stream, FutureExt}; use itertools::Itertools; use lance_core::cache::{CacheKey, UnsizedCacheKey}; use lance_core::datatypes::Field; @@ -921,170 +921,16 @@ impl DatasetIndexExt for Dataset { } if index_name == FRAG_REUSE_INDEX_NAME { - let index = self - .open_frag_reuse_index(&NoOpMetricsCollector) - .await? - .expect("FragmentReuse index does not exist"); - return serde_json::to_string(&index.statistics()?).map_err(|e| Error::Index { - message: format!("Failed to serialize index statistics: {}", e), - location: location!(), - }); + return index_statistics_frag_reuse(self).boxed().await; } if index_name == MEM_WAL_INDEX_NAME { - let index = self - .open_mem_wal_index(&NoOpMetricsCollector) - .await? - .expect("MemWal index does not exist"); - return serde_json::to_string(&index.statistics()?).map_err(|e| Error::Index { - message: format!("Failed to serialize index statistics: {}", e), - location: location!(), - }); - } - - let field_id = metadatas[0].fields[0]; - let field_path = self.schema().field_path(field_id)?; - - let mut indices_stats = Vec::with_capacity(metadatas.len()); - let mut index_uri: Option = None; - let mut index_typename: Option = None; - - for meta in metadatas.iter() { - let index_store = Arc::new(LanceIndexStore::from_dataset_for_existing(self, meta)?); - let index_details = scalar::fetch_index_details(self, &field_path, meta).await?; - if index_uri.is_none() { - index_uri = Some(index_details.type_url.clone()); - } - let index_details_wrapper = scalar::IndexDetails(index_details.clone()); - - if let Ok(plugin) = index_details_wrapper.get_plugin() { - if index_typename.is_none() { - index_typename = Some(plugin.name().to_string()); - } - - if let Some(stats) = plugin - .load_statistics(index_store.clone(), index_details.as_ref()) - .await? - { - indices_stats.push(stats); - continue; - } - } - - let index = self - .open_generic_index(&field_path, &meta.uuid.to_string(), &NoOpMetricsCollector) - .await?; - - if index_typename.is_none() { - // Fall back to a friendly name from the type URL if the plugin is unknown - let uri = index_uri - .as_deref() - .unwrap_or_else(|| index_details.type_url.as_str()); - index_typename = Some(type_name_from_uri(uri)); - } - - indices_stats.push(index.statistics()?); - } - - let index_uri = index_uri.unwrap_or_else(|| "unknown".to_string()); - let index_type_hint = indices_stats - .first() - .and_then(|stats| stats.get("index_type")) - .and_then(|v| v.as_str()); - let index_type = legacy_type_name(&index_uri, index_type_hint); - - let indexed_fragments_per_delta = self.indexed_fragments(index_name).await?; - - let res = indexed_fragments_per_delta - .iter() - .map(|frags| { - let mut sum = 0; - for frag in frags.iter() { - sum += frag.num_rows().ok_or_else(|| Error::Internal { - message: "Fragment should have row counts, please upgrade lance and \ - trigger a single write to fix this" - .to_string(), - location: location!(), - })?; - } - Ok(sum) - }) - .collect::>>(); - - async fn migrate_and_recompute(ds: &Dataset, index_name: &str) -> Result { - let mut ds = ds.clone(); - log::warn!( - "Detecting out-dated fragment metadata, migrating dataset. \ - To disable migration, set LANCE_AUTO_MIGRATION=false" - ); - ds.delete("false").await.map_err(|err| { - Error::Execution { - message: format!("Failed to migrate dataset while calculating index statistics. \ - To disable migration, set LANCE_AUTO_MIGRATION=false. Original error: {}", err), - location: location!(), - } - })?; - ds.index_statistics(index_name).await + return index_statistics_mem_wal(self).boxed().await; } - let num_indexed_rows_per_delta = match res { - Ok(rows) => rows, - Err(Error::Internal { message, .. }) - if auto_migrate_corruption() && message.contains("trigger a single write") => - { - return migrate_and_recompute(self, index_name).await; - } - Err(e) => return Err(e), - }; - - let mut fragment_ids = HashSet::new(); - for frags in indexed_fragments_per_delta.iter() { - for frag in frags.iter() { - if !fragment_ids.insert(frag.id) { - if auto_migrate_corruption() { - return migrate_and_recompute(self, index_name).await; - } else { - return Err(Error::Internal { - message: - "Overlap in indexed fragments. Please upgrade to lance >= 0.23.0 \ - and trigger a single write to fix this" - .to_string(), - location: location!(), - }); - } - } - } - } - let num_indexed_fragments = fragment_ids.len(); - - let num_unindexed_fragments = self.fragments().len() - num_indexed_fragments; - let num_indexed_rows: usize = num_indexed_rows_per_delta.iter().cloned().sum(); - let num_unindexed_rows = self.count_rows(None).await? - num_indexed_rows; - - // Calculate updated_at as max(created_at) from all index metadata - let updated_at = metadatas - .iter() - .filter_map(|m| m.created_at) - .max() - .map(|dt| dt.timestamp_millis() as u64); - - let stats = json!({ - "index_type": index_type, - "name": index_name, - "num_indices": metadatas.len(), - "indices": indices_stats, - "num_indexed_fragments": num_indexed_fragments, - "num_indexed_rows": num_indexed_rows, - "num_unindexed_fragments": num_unindexed_fragments, - "num_unindexed_rows": num_unindexed_rows, - "num_indexed_rows_per_delta": num_indexed_rows_per_delta, - "updated_at_timestamp_ms": updated_at, - }); - - serde_json::to_string(&stats).map_err(|e| Error::Index { - message: format!("Failed to serialize index statistics: {}", e), - location: location!(), - }) + index_statistics_scalar(self, index_name, metadatas) + .boxed() + .await } async fn read_index_partition( @@ -1132,6 +978,213 @@ impl DatasetIndexExt for Dataset { } } +fn sum_indexed_rows_per_delta(indexed_fragments_per_delta: &[Vec]) -> Result> { + let mut rows_per_delta = Vec::with_capacity(indexed_fragments_per_delta.len()); + for frags in indexed_fragments_per_delta { + let mut sum = 0usize; + for frag in frags { + sum += frag.num_rows().ok_or_else(|| Error::Internal { + message: "Fragment should have row counts, please upgrade lance and \ + trigger a single write to fix this" + .to_string(), + location: location!(), + })?; + } + rows_per_delta.push(sum); + } + Ok(rows_per_delta) +} + +fn unique_indexed_fragment_count(indexed_fragments_per_delta: &[Vec]) -> Option { + let mut fragment_ids = HashSet::new(); + for frags in indexed_fragments_per_delta { + for frag in frags { + if !fragment_ids.insert(frag.id) { + return None; + } + } + } + Some(fragment_ids.len()) +} + +fn serialize_index_statistics(stats: &serde_json::Value) -> Result { + serde_json::to_string(stats).map_err(|e| Error::Index { + message: format!("Failed to serialize index statistics: {}", e), + location: location!(), + }) +} + +async fn migrate_and_recompute_index_statistics(ds: &Dataset, index_name: &str) -> Result { + let mut ds = ds.clone(); + log::warn!( + "Detecting out-dated fragment metadata, migrating dataset. \ + To disable migration, set LANCE_AUTO_MIGRATION=false" + ); + ds.delete("false").await.map_err(|err| Error::Execution { + message: format!( + "Failed to migrate dataset while calculating index statistics. \ + To disable migration, set LANCE_AUTO_MIGRATION=false. Original error: {}", + err + ), + location: location!(), + })?; + ds.index_statistics(index_name).await +} + +async fn index_statistics_frag_reuse(ds: &Dataset) -> Result { + let index = ds + .open_frag_reuse_index(&NoOpMetricsCollector) + .await? + .expect("FragmentReuse index does not exist"); + serialize_index_statistics(&index.statistics()?) +} + +async fn index_statistics_mem_wal(ds: &Dataset) -> Result { + let index = ds + .open_mem_wal_index(&NoOpMetricsCollector) + .await? + .expect("MemWal index does not exist"); + serialize_index_statistics(&index.statistics()?) +} + +async fn index_statistics_scalar( + ds: &Dataset, + index_name: &str, + metadatas: Vec, +) -> Result { + let field_id = metadatas[0].fields[0]; + let field_path = ds.schema().field_path(field_id)?; + + let (indices_stats, index_uri, num_indices, updated_at) = + collect_regular_indices_statistics(ds, metadatas, &field_path).await?; + + let index_type_hint = indices_stats + .first() + .and_then(|stats| stats.get("index_type")) + .and_then(|v| v.as_str()); + let index_type = legacy_type_name(&index_uri, index_type_hint); + + let Some(( + num_indexed_rows_per_delta, + num_indexed_fragments, + num_unindexed_fragments, + num_indexed_rows, + num_unindexed_rows, + )) = gather_fragment_statistics(ds, index_name).await? + else { + return migrate_and_recompute_index_statistics(ds, index_name).await; + }; + + let stats = json!({ + "index_type": index_type, + "name": index_name, + "num_indices": num_indices, + "indices": indices_stats, + "num_indexed_fragments": num_indexed_fragments, + "num_indexed_rows": num_indexed_rows, + "num_unindexed_fragments": num_unindexed_fragments, + "num_unindexed_rows": num_unindexed_rows, + "num_indexed_rows_per_delta": num_indexed_rows_per_delta, + "updated_at_timestamp_ms": updated_at, + }); + + serialize_index_statistics(&stats) +} + +async fn collect_regular_indices_statistics( + ds: &Dataset, + metadatas: Vec, + field_path: &str, +) -> Result<(Vec, String, usize, Option)> { + let num_indices = metadatas.len(); + let updated_at = metadatas + .iter() + .filter_map(|m| m.created_at) + .max() + .map(|dt| dt.timestamp_millis() as u64); + + let mut indices_stats = Vec::with_capacity(num_indices); + let mut index_uri: Option = None; + + for meta in metadatas.iter() { + let index_store = Arc::new(LanceIndexStore::from_dataset_for_existing(ds, meta)?); + let index_details = scalar::fetch_index_details(ds, field_path, meta).await?; + if index_uri.is_none() { + index_uri = Some(index_details.type_url.clone()); + } + + let index_details_wrapper = scalar::IndexDetails(index_details.clone()); + if let Ok(plugin) = index_details_wrapper.get_plugin() { + if let Some(stats) = plugin + .load_statistics(index_store.clone(), index_details.as_ref()) + .await? + { + indices_stats.push(stats); + continue; + } + } + + let index = ds + .open_generic_index(field_path, &meta.uuid.to_string(), &NoOpMetricsCollector) + .await?; + + indices_stats.push(index.statistics()?); + } + + Ok(( + indices_stats, + index_uri.unwrap_or_else(|| "unknown".to_string()), + num_indices, + updated_at, + )) +} + +async fn gather_fragment_statistics( + ds: &Dataset, + index_name: &str, +) -> Result, usize, usize, usize, usize)>> { + let indexed_fragments_per_delta = ds.indexed_fragments(index_name).await?; + + let num_indexed_rows_per_delta = match sum_indexed_rows_per_delta(&indexed_fragments_per_delta) + { + Ok(rows) => rows, + Err(Error::Internal { message, .. }) + if auto_migrate_corruption() && message.contains("trigger a single write") => + { + return Ok(None); + } + Err(e) => return Err(e), + }; + + let Some(num_indexed_fragments) = unique_indexed_fragment_count(&indexed_fragments_per_delta) + else { + if auto_migrate_corruption() { + return Ok(None); + } + return Err(Error::Internal { + message: "Overlap in indexed fragments. Please upgrade to lance >= 0.23.0 \ + and trigger a single write to fix this" + .to_string(), + location: location!(), + }); + }; + + let num_unindexed_fragments = ds.fragments().len() - num_indexed_fragments; + let num_indexed_rows: usize = num_indexed_rows_per_delta.iter().sum(); + + drop(indexed_fragments_per_delta); + let total_rows = ds.count_rows(None).await?; + let num_unindexed_rows = total_rows - num_indexed_rows; + + Ok(Some(( + num_indexed_rows_per_delta, + num_indexed_fragments, + num_unindexed_fragments, + num_indexed_rows, + num_unindexed_rows, + ))) +} + pub(crate) fn retain_supported_indices(indices: &mut Vec) { indices.retain(|idx| { let max_supported_version = idx From 367c7e9116a43c23d4c934f98f44a8a058edf340 Mon Sep 17 00:00:00 2001 From: Lance Community Date: Wed, 11 Feb 2026 14:37:05 -0800 Subject: [PATCH 4/4] ci: bump rust toolchain to 1.91.0 (#5937) Link: https://github.com/lance-format/lance/actions/runs/21917803271 Summary of failure: - build-no-lock failed because rustc 1.90.0 is too old for updated aws-smithy dependencies (requires 1.91). Fixes applied: - Bumped the pinned Rust toolchain to 1.91.0 to satisfy dependency MSRV in no-lock builds. --------- Co-authored-by: Jack Ye --- .github/workflows/rust.yml | 1 + rust-toolchain.toml | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 01a9b8363e2..fea99d53ce3 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -12,6 +12,7 @@ on: - rust/** - protos/** - .github/workflows/rust.yml + - rust-toolchain.toml - Cargo.toml - Cargo.lock - deny.toml diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 62b4dcbb1fa..089a799280d 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,5 +1,5 @@ # We keep this pinned to keep clippy and rustfmt in sync between local and CI. # Feel free to upgrade to bring in new lints. [toolchain] -channel = "1.90.0" +channel = "1.91.0" components = ["rustfmt", "clippy", "rust-analyzer"]