From 649e0c8aeffe8e4408c808f1a18426bd0c978d4e Mon Sep 17 00:00:00 2001 From: "majin.nathan" Date: Mon, 30 Jun 2025 12:08:41 +0800 Subject: [PATCH 1/3] feat(java): support replace schema and field metadata --- java/core/lance-jni/src/blocking_dataset.rs | 71 +++++++++++++++++++ java/core/lance-jni/src/error.rs | 1 + .../main/java/com/lancedb/lance/Dataset.java | 32 +++++++++ .../java/com/lancedb/lance/DatasetTest.java | 71 +++++++++++++++++++ rust/lance-core/src/error.rs | 5 ++ rust/lance-table/src/format/manifest.rs | 12 +++- rust/lance/src/dataset/transaction.rs | 2 +- 7 files changed, 192 insertions(+), 2 deletions(-) diff --git a/java/core/lance-jni/src/blocking_dataset.rs b/java/core/lance-jni/src/blocking_dataset.rs index b91477497d1..a20d9655477 100644 --- a/java/core/lance-jni/src/blocking_dataset.rs +++ b/java/core/lance-jni/src/blocking_dataset.rs @@ -239,6 +239,19 @@ impl BlockingDataset { Ok(()) } + pub fn replace_schema_metadata(&mut self, metadata: HashMap) -> Result<()> { + RT.block_on(self.inner.replace_schema_metadata(metadata))?; + Ok(()) + } + + pub fn replace_field_metadata( + &mut self, + metadata_map: HashMap>, + ) -> Result<()> { + RT.block_on(self.inner.replace_field_metadata(metadata_map))?; + Ok(()) + } + pub fn close(&self) {} } @@ -1603,3 +1616,61 @@ fn inner_get_version_by_tag( { unsafe { env.get_rust_field::<_, _, BlockingDataset>(java_dataset, NATIVE_DATASET) }? }; dataset_guard.get_version(tag.as_str()) } + +#[no_mangle] +pub extern "system" fn Java_com_lancedb_lance_Dataset_nativeReplaceSchemaMetadata( + mut env: JNIEnv, + java_dataset: JObject, + jschema_metadata: JObject, +) { + ok_or_throw_without_return!( + env, + inner_replace_schema_metadata(&mut env, java_dataset, jschema_metadata) + ) +} + +fn inner_replace_schema_metadata( + env: &mut JNIEnv, + java_dataset: JObject, + jschema_metadata: JObject, +) -> Result<()> { + let jmap = JMap::from_env(env, &jschema_metadata)?; + let schema_metadata = to_rust_map(env, &jmap)?; + let mut dataset_guard = + { unsafe { env.get_rust_field::<_, _, BlockingDataset>(java_dataset, NATIVE_DATASET) }? }; + dataset_guard.replace_schema_metadata(schema_metadata) +} + +#[no_mangle] +pub extern "system" fn Java_com_lancedb_lance_Dataset_nativeReplaceFieldMetadata( + mut env: JNIEnv, + java_dataset: JObject, + jfield_metadata_map: JObject, +) { + ok_or_throw_without_return!( + env, + inner_replace_field_metadata(&mut env, java_dataset, jfield_metadata_map) + ) +} + +fn inner_replace_field_metadata( + env: &mut JNIEnv, + java_dataset: JObject, + jfield_metadata_map: JObject, +) -> Result<()> { + let jmap = JMap::from_env(env, &jfield_metadata_map)?; + let mut field_metadata_map = HashMap::new(); + let mut iter = jmap.iter(env)?; + env.with_local_frame(16, |env| { + while let Some((key, value)) = iter.next(env)? { + let field_id = env.call_method(&key, "intValue", "()I", &[])?.i()? as u32; + let inner_map = JMap::from_env(env, &value)?; + let value_map = to_rust_map(env, &inner_map)?; + field_metadata_map.insert(field_id, value_map); + } + Ok::<(), Error>(()) + })?; + let mut dataset_guard = + { unsafe { env.get_rust_field::<_, _, BlockingDataset>(java_dataset, NATIVE_DATASET) }? }; + dataset_guard.replace_field_metadata(field_metadata_map) +} diff --git a/java/core/lance-jni/src/error.rs b/java/core/lance-jni/src/error.rs index 05454c6111b..b71de060239 100644 --- a/java/core/lance-jni/src/error.rs +++ b/java/core/lance-jni/src/error.rs @@ -107,6 +107,7 @@ impl From for Error { LanceError::DatasetNotFound { .. } | LanceError::DatasetAlreadyExists { .. } | LanceError::CommitConflict { .. } + | LanceError::FieldNotFound { .. } | LanceError::InvalidInput { .. } => Self::input_error(err.to_string()), LanceError::IO { .. } => Self::io_error(err.to_string()), LanceError::NotSupported { .. } => Self::unsupported_error(err.to_string()), diff --git a/java/core/src/main/java/com/lancedb/lance/Dataset.java b/java/core/src/main/java/com/lancedb/lance/Dataset.java index 191025c9756..14c51bc78f3 100644 --- a/java/core/src/main/java/com/lancedb/lance/Dataset.java +++ b/java/core/src/main/java/com/lancedb/lance/Dataset.java @@ -787,6 +787,38 @@ public Tags tags() { return new Tags(); } + /** + * Replace the schema metadata of the dataset. + * + * @param metadata the new table metadata + */ + public void replaceSchemaMetadata(Map metadata) { + try (LockManager.WriteLock writeLock = lockManager.acquireWriteLock()) { + Preconditions.checkArgument(nativeDatasetHandle != 0, "Dataset is closed"); + nativeReplaceSchemaMetadata(metadata); + } + } + + private native void nativeReplaceSchemaMetadata(Map metadata); + + /** + * Replace target field metadata of the dataset. This method won't affect fields not in the map + * + * @param fieldMetadataMap field id to metadata map + */ + public void replaceFieldMetadata(Map> fieldMetadataMap) { + try (LockManager.WriteLock writeLock = lockManager.acquireWriteLock()) { + Preconditions.checkArgument(nativeDatasetHandle != 0, "Dataset is closed"); + for (Integer fieldId : fieldMetadataMap.keySet()) { + Preconditions.checkArgument(fieldId >= 0, "Field id must be greater than 0"); + } + nativeReplaceFieldMetadata(fieldMetadataMap); + } + } + + private native void nativeReplaceFieldMetadata( + Map> fieldMetadataMap); + /** Tag operations of the dataset. */ public class Tags { diff --git a/java/core/src/test/java/com/lancedb/lance/DatasetTest.java b/java/core/src/test/java/com/lancedb/lance/DatasetTest.java index 17eb6a2ec01..bd7b3784ce9 100644 --- a/java/core/src/test/java/com/lancedb/lance/DatasetTest.java +++ b/java/core/src/test/java/com/lancedb/lance/DatasetTest.java @@ -15,6 +15,7 @@ import com.lancedb.lance.ipc.LanceScanner; import com.lancedb.lance.schema.ColumnAlteration; +import com.lancedb.lance.schema.LanceField; import com.lancedb.lance.schema.SqlExpressions; import org.apache.arrow.c.ArrowArrayStream; @@ -847,4 +848,74 @@ void testGetLanceSchema(@TempDir Path tempDir) { assertEquals(testDataset.getSchema(), dataset.getLanceSchema().asArrowSchema()); } } + + @Test + void testReplaceSchemaMetadata(@TempDir Path tempDir) { + String testMethodName = new Object() {}.getClass().getEnclosingMethod().getName(); + String datasetPath = tempDir.resolve(testMethodName).toString(); + try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) { + TestUtils.SimpleTestDataset testDataset = + new TestUtils.SimpleTestDataset(allocator, datasetPath); + dataset = testDataset.createEmptyDataset(); + assertEquals(1, dataset.version()); + Map replaceMetadata = new HashMap<>(); + replaceMetadata.put("key1", "value1"); + replaceMetadata.put("key2", "value2"); + dataset.replaceSchemaMetadata(replaceMetadata); + assertEquals(2, dataset.version()); + Map currentMetadata = dataset.getSchema().getCustomMetadata(); + for (String configKey : currentMetadata.keySet()) { + assertEquals(currentMetadata.get(configKey), replaceMetadata.get(configKey)); + } + assertEquals(replaceMetadata.size(), currentMetadata.size()); + + Map replaceConfig2 = new HashMap<>(); + replaceConfig2.put("key1", "value3"); + dataset.replaceSchemaMetadata(replaceConfig2); + currentMetadata = dataset.getSchema().getCustomMetadata(); + assertEquals(3, dataset.version()); + assertEquals(1, currentMetadata.size()); + assertEquals("value3", currentMetadata.get("key1")); + } + } + + @Test + void testReplaceFieldConfig(@TempDir Path tempDir) { + String testMethodName = new Object() {}.getClass().getEnclosingMethod().getName(); + String datasetPath = tempDir.resolve(testMethodName).toString(); + try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) { + TestUtils.SimpleTestDataset testDataset = + new TestUtils.SimpleTestDataset(allocator, datasetPath); + dataset = testDataset.createEmptyDataset(); + assertEquals(1, dataset.version()); + LanceField field = dataset.getLanceSchema().fields().get(0); + Map replaceMetadata = new HashMap<>(); + replaceMetadata.put("key1", "value1"); + replaceMetadata.put("key2", "value2"); + dataset.replaceFieldMetadata(Collections.singletonMap(field.getId(), replaceMetadata)); + assertEquals(2, dataset.version()); + Map currentMetadata = dataset.getSchema().getFields().get(0).getMetadata(); + for (String configKey : currentMetadata.keySet()) { + assertEquals(currentMetadata.get(configKey), replaceMetadata.get(configKey)); + } + assertEquals(replaceMetadata.size(), currentMetadata.size()); + + Map replaceConfig2 = new HashMap<>(); + replaceConfig2.put("key1", "value3"); + dataset.replaceFieldMetadata(Collections.singletonMap(field.getId(), replaceConfig2)); + currentMetadata = dataset.getSchema().getFields().get(0).getMetadata(); + assertEquals(3, dataset.version()); + assertEquals(1, currentMetadata.size()); + assertEquals("value3", currentMetadata.get("key1")); + + assertThrows( + IllegalArgumentException.class, + () -> + dataset.replaceFieldMetadata( + Collections.singletonMap(Integer.MAX_VALUE, replaceConfig2))); + assertThrows( + IllegalArgumentException.class, + () -> dataset.replaceFieldMetadata(Collections.singletonMap(-1, replaceConfig2))); + } + } } diff --git a/rust/lance-core/src/error.rs b/rust/lance-core/src/error.rs index 6eed53b010e..27f5e7a66ab 100644 --- a/rust/lance-core/src/error.rs +++ b/rust/lance-core/src/error.rs @@ -69,6 +69,11 @@ pub enum Error { Schema { message: String, location: Location }, #[snafu(display("Not found: {uri}, {location}"))] NotFound { uri: String, location: Location }, + #[snafu(display("Field not found: {field_desc}, {location}"))] + FieldNotFound { + field_desc: String, + location: Location, + }, #[snafu(display("LanceError(IO): {source}, {location}"))] IO { source: BoxedError, diff --git a/rust/lance-table/src/format/manifest.rs b/rust/lance-table/src/format/manifest.rs index 941b9e57991..e1f884026b6 100644 --- a/rust/lance-table/src/format/manifest.rs +++ b/rust/lance-table/src/format/manifest.rs @@ -214,9 +214,19 @@ impl Manifest { /// Replaces the metadata of the field with the given id with the given key-value pairs. /// /// If the field does not exist in the schema, this is a no-op. - pub fn replace_field_metadata(&mut self, field_id: i32, new_metadata: HashMap) { + pub fn replace_field_metadata( + &mut self, + field_id: i32, + new_metadata: HashMap, + ) -> Result<()> { if let Some(field) = self.schema.field_by_id_mut(field_id) { field.metadata = new_metadata; + Ok(()) + } else { + Err(Error::FieldNotFound { + field_desc: format!("ID<{}>", field_id), + location: location!(), + }) } } diff --git a/rust/lance/src/dataset/transaction.rs b/rust/lance/src/dataset/transaction.rs index 52f2475d0bb..3390af637be 100644 --- a/rust/lance/src/dataset/transaction.rs +++ b/rust/lance/src/dataset/transaction.rs @@ -1553,7 +1553,7 @@ impl Transaction { } if let Some(field_metadata) = field_metadata { for (field_id, metadata) in field_metadata { - manifest.replace_field_metadata(*field_id as i32, metadata.clone()); + manifest.replace_field_metadata(*field_id as i32, metadata.clone())?; } } } From ead404840cf0379ea219dd66ae62b86d8541e540 Mon Sep 17 00:00:00 2001 From: majin1102 Date: Sat, 19 Jul 2025 17:32:46 +0800 Subject: [PATCH 2/3] optimize error message --- java/core/lance-jni/src/error.rs | 2 +- rust/lance-core/src/error.rs | 4 ++-- rust/lance-table/src/format/manifest.rs | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/java/core/lance-jni/src/error.rs b/java/core/lance-jni/src/error.rs index b71de060239..e854a9d124a 100644 --- a/java/core/lance-jni/src/error.rs +++ b/java/core/lance-jni/src/error.rs @@ -107,7 +107,7 @@ impl From for Error { LanceError::DatasetNotFound { .. } | LanceError::DatasetAlreadyExists { .. } | LanceError::CommitConflict { .. } - | LanceError::FieldNotFound { .. } + | LanceError::FieldNotExists { .. } | LanceError::InvalidInput { .. } => Self::input_error(err.to_string()), LanceError::IO { .. } => Self::io_error(err.to_string()), LanceError::NotSupported { .. } => Self::unsupported_error(err.to_string()), diff --git a/rust/lance-core/src/error.rs b/rust/lance-core/src/error.rs index 27f5e7a66ab..efa7f90bcf6 100644 --- a/rust/lance-core/src/error.rs +++ b/rust/lance-core/src/error.rs @@ -69,8 +69,8 @@ pub enum Error { Schema { message: String, location: Location }, #[snafu(display("Not found: {uri}, {location}"))] NotFound { uri: String, location: Location }, - #[snafu(display("Field not found: {field_desc}, {location}"))] - FieldNotFound { + #[snafu(display("Field does not exists: {field_desc}, {location}"))] + FieldNotExists { field_desc: String, location: Location, }, diff --git a/rust/lance-table/src/format/manifest.rs b/rust/lance-table/src/format/manifest.rs index e1f884026b6..e8bf0b609c5 100644 --- a/rust/lance-table/src/format/manifest.rs +++ b/rust/lance-table/src/format/manifest.rs @@ -223,7 +223,7 @@ impl Manifest { field.metadata = new_metadata; Ok(()) } else { - Err(Error::FieldNotFound { + Err(Error::FieldNotExists { field_desc: format!("ID<{}>", field_id), location: location!(), }) From 99b6b5990b514be7949bd9840a99b03f449a151c Mon Sep 17 00:00:00 2001 From: majin1102 Date: Mon, 21 Jul 2025 16:07:39 +0800 Subject: [PATCH 3/3] optimize error message --- java/core/lance-jni/src/error.rs | 1 - rust/lance-core/src/error.rs | 5 ----- rust/lance-table/src/format/manifest.rs | 11 +++++++---- 3 files changed, 7 insertions(+), 10 deletions(-) diff --git a/java/core/lance-jni/src/error.rs b/java/core/lance-jni/src/error.rs index e854a9d124a..05454c6111b 100644 --- a/java/core/lance-jni/src/error.rs +++ b/java/core/lance-jni/src/error.rs @@ -107,7 +107,6 @@ impl From for Error { LanceError::DatasetNotFound { .. } | LanceError::DatasetAlreadyExists { .. } | LanceError::CommitConflict { .. } - | LanceError::FieldNotExists { .. } | LanceError::InvalidInput { .. } => Self::input_error(err.to_string()), LanceError::IO { .. } => Self::io_error(err.to_string()), LanceError::NotSupported { .. } => Self::unsupported_error(err.to_string()), diff --git a/rust/lance-core/src/error.rs b/rust/lance-core/src/error.rs index efa7f90bcf6..6eed53b010e 100644 --- a/rust/lance-core/src/error.rs +++ b/rust/lance-core/src/error.rs @@ -69,11 +69,6 @@ pub enum Error { Schema { message: String, location: Location }, #[snafu(display("Not found: {uri}, {location}"))] NotFound { uri: String, location: Location }, - #[snafu(display("Field does not exists: {field_desc}, {location}"))] - FieldNotExists { - field_desc: String, - location: Location, - }, #[snafu(display("LanceError(IO): {source}, {location}"))] IO { source: BoxedError, diff --git a/rust/lance-table/src/format/manifest.rs b/rust/lance-table/src/format/manifest.rs index e8bf0b609c5..bb60dd6831c 100644 --- a/rust/lance-table/src/format/manifest.rs +++ b/rust/lance-table/src/format/manifest.rs @@ -223,10 +223,13 @@ impl Manifest { field.metadata = new_metadata; Ok(()) } else { - Err(Error::FieldNotExists { - field_desc: format!("ID<{}>", field_id), - location: location!(), - }) + Err(Error::invalid_input( + format!( + "Field with id {} does not exist for replace_field_metadata", + field_id + ), + location!(), + )) } }