diff --git a/java/lance-jni/src/fragment.rs b/java/lance-jni/src/fragment.rs index 98be13a3e44..5b8cc2e9f36 100644 --- a/java/lance-jni/src/fragment.rs +++ b/java/lance-jni/src/fragment.rs @@ -37,7 +37,7 @@ pub(crate) struct FragmentMergeResult { #[derive(Debug, Clone)] pub(crate) struct FragmentUpdateResult { updated_fragment: Fragment, - fields_modified: Vec, + bitmap_prune_field_ids: Vec, } ////////////////// @@ -412,11 +412,11 @@ fn inner_update_column<'local>( let reader = unsafe { ArrowArrayStreamReader::from_raw(stream_ptr) }?; let left_on_str: String = left_on.extract(env)?; let right_on_str: String = right_on.extract(env)?; - let (updated_fragment, fields_modified) = + let (updated_fragment, bitmap_prune_field_ids) = RT.block_on(fragment.update_columns(reader, &left_on_str, &right_on_str))?; let result = FragmentUpdateResult { updated_fragment, - fields_modified, + bitmap_prune_field_ids, }; result.into_java(env) } @@ -456,13 +456,14 @@ impl IntoJava for &FragmentMergeResult { impl IntoJava for &FragmentUpdateResult { fn into_java<'a>(self, env: &mut JNIEnv<'a>) -> Result> { let java_updated_fragment = self.updated_fragment.into_java(env)?; - let java_fields_modified = JLance(self.fields_modified.clone()).into_java(env)?; + let java_bitmap_prune_field_ids = + JLance(self.bitmap_prune_field_ids.clone()).into_java(env)?; Ok(env.new_object( FRAGMENT_UPDATE_RESULT_CLASS, FRAGMENT_UPDATE_RESULT_CONSTRUCTOR_SIG, &[ JValueGen::Object(&java_updated_fragment), - JValueGen::Object(&java_fields_modified), + JValueGen::Object(&java_bitmap_prune_field_ids), ], )?) } diff --git a/java/lance-jni/src/transaction.rs b/java/lance-jni/src/transaction.rs index f9ce719f369..32b81f7205f 100644 --- a/java/lance-jni/src/transaction.rs +++ b/java/lance-jni/src/transaction.rs @@ -495,9 +495,9 @@ fn convert_to_java_operation_inner<'local>( removed_fragment_ids, updated_fragments, new_fragments, - fields_modified, + bitmap_prune_field_ids, mem_wal_to_merge: _, - fields_for_preserving_frag_bitmap, + bitmap_preserve_field_ids, update_mode, } => { let removed_ids: Vec> = removed_fragment_ids @@ -507,9 +507,9 @@ fn convert_to_java_operation_inner<'local>( let removed_fragment_ids_obj = export_vec(env, &removed_ids)?; let updated_fragments_obj = export_vec(env, &updated_fragments)?; let new_fragments_obj = export_vec(env, &new_fragments)?; - let fields_modified = JLance(fields_modified.clone()).into_java(env)?; - let fields_for_preserving_frag_bitmap = - JLance(fields_for_preserving_frag_bitmap.clone()).into_java(env)?; + let bitmap_prune_field_ids = JLance(bitmap_prune_field_ids.clone()).into_java(env)?; + let bitmap_preserve_field_ids = + JLance(bitmap_preserve_field_ids.clone()).into_java(env)?; let update_mode = match update_mode { Some(update_mode) => update_mode.into_java(env), None => Ok(JObject::null()), @@ -529,8 +529,8 @@ fn convert_to_java_operation_inner<'local>( JValue::Object(&removed_fragment_ids_obj), JValue::Object(&updated_fragments_obj), JValue::Object(&new_fragments_obj), - JValueGen::Object(&fields_modified), - JValueGen::Object(&fields_for_preserving_frag_bitmap), + JValueGen::Object(&bitmap_prune_field_ids), + JValueGen::Object(&bitmap_preserve_field_ids), JValue::Object(&update_mode_optional), ], )?) @@ -939,16 +939,17 @@ fn convert_to_rust_operation( fragment.extract_object(env) })?; - let fields_modified = env - .call_method(java_operation, "fieldsModified", "()[J", &[])? + let bitmap_prune_field_ids = env + .call_method(java_operation, "bitmapPruneFieldIds", "()[J", &[])? .l()?; - let fields_modified = JLongArray::from(fields_modified).extract_object(env)?; + let bitmap_prune_field_ids = + JLongArray::from(bitmap_prune_field_ids).extract_object(env)?; - let fields_for_preserving_frag_bitmap = env - .call_method(java_operation, "fieldsForPreservingFragBitmap", "()[J", &[])? + let bitmap_preserve_field_ids = env + .call_method(java_operation, "bitmapPreserveExcludeFieldIds", "()[J", &[])? .l()?; - let fields_for_preserving_frag_bitmap = - JLongArray::from(fields_for_preserving_frag_bitmap).extract_object(env)?; + let bitmap_preserve_field_ids = + JLongArray::from(bitmap_preserve_field_ids).extract_object(env)?; let update_mode: Option = env.get_optional_from_method(java_operation, "updateMode", |env, update_mode| { @@ -959,9 +960,9 @@ fn convert_to_rust_operation( removed_fragment_ids, updated_fragments, new_fragments, - fields_modified, + bitmap_prune_field_ids, mem_wal_to_merge: None, - fields_for_preserving_frag_bitmap, + bitmap_preserve_field_ids, update_mode, } } diff --git a/java/src/main/java/com/lancedb/lance/fragment/FragmentUpdateResult.java b/java/src/main/java/com/lancedb/lance/fragment/FragmentUpdateResult.java index 1769c145cd9..56111bbc7a3 100644 --- a/java/src/main/java/com/lancedb/lance/fragment/FragmentUpdateResult.java +++ b/java/src/main/java/com/lancedb/lance/fragment/FragmentUpdateResult.java @@ -24,26 +24,26 @@ */ public class FragmentUpdateResult { private final FragmentMetadata updatedFragment; - private final long[] fieldsModified; + private final long[] bitmapPruneFieldIds; - public FragmentUpdateResult(FragmentMetadata updatedFragment, long[] updatedFieldIds) { + public FragmentUpdateResult(FragmentMetadata updatedFragment, long[] bitmapPruneFieldIds) { this.updatedFragment = updatedFragment; - this.fieldsModified = updatedFieldIds; + this.bitmapPruneFieldIds = bitmapPruneFieldIds; } public FragmentMetadata getUpdatedFragment() { return updatedFragment; } - public long[] getFieldsModified() { - return fieldsModified; + public long[] getBitmapPruneFieldIds() { + return bitmapPruneFieldIds; } @Override public String toString() { return MoreObjects.toStringHelper(this) .add("fragmentMetadata", updatedFragment) - .add("updatedFieldIds", fieldsModified) + .add("bitmapPruneFieldIds", bitmapPruneFieldIds) .toString(); } } diff --git a/java/src/main/java/com/lancedb/lance/operation/Update.java b/java/src/main/java/com/lancedb/lance/operation/Update.java index d0d7c3871d1..5f2995aad7a 100644 --- a/java/src/main/java/com/lancedb/lance/operation/Update.java +++ b/java/src/main/java/com/lancedb/lance/operation/Update.java @@ -27,22 +27,22 @@ public class Update implements Operation { private final List removedFragmentIds; private final List updatedFragments; private final List newFragments; - private final long[] fieldsModified; - private final long[] fieldsForPreservingFragBitmap; + private final long[] bitmapPruneFieldIds; + private final long[] bitmapPreserveExcludeFieldIds; private final Optional updateMode; private Update( List removedFragmentIds, List updatedFragments, List newFragments, - long[] fieldsModified, - long[] fieldsForPreservingFragBitmap, + long[] bitmapPruneFieldIds, + long[] bitmapPreserveExcludeFieldIds, Optional updateMode) { this.removedFragmentIds = removedFragmentIds; this.updatedFragments = updatedFragments; this.newFragments = newFragments; - this.fieldsModified = fieldsModified; - this.fieldsForPreservingFragBitmap = fieldsForPreservingFragBitmap; + this.bitmapPruneFieldIds = bitmapPruneFieldIds; + this.bitmapPreserveExcludeFieldIds = bitmapPreserveExcludeFieldIds; this.updateMode = updateMode; } @@ -62,12 +62,12 @@ public List newFragments() { return newFragments; } - public long[] fieldsModified() { - return fieldsModified; + public long[] bitmapPruneFieldIds() { + return bitmapPruneFieldIds; } - public long[] fieldsForPreservingFragBitmap() { - return fieldsForPreservingFragBitmap; + public long[] bitmapPreserveExcludeFieldIds() { + return bitmapPreserveExcludeFieldIds; } public Optional updateMode() { @@ -84,8 +84,8 @@ public String toString() { .add("removedFragmentIds", removedFragmentIds) .add("updatedFragments", updatedFragments) .add("newFragments", newFragments) - .add("fieldsModified", fieldsModified) - .add("fieldsForPreservingFragBitmap", fieldsForPreservingFragBitmap) + .add("bitmapPruneFieldIds", bitmapPruneFieldIds) + .add("bitmapPreserveExcludeFieldIds", bitmapPreserveExcludeFieldIds) .add("updateMode", updateMode) .toString(); } @@ -98,8 +98,8 @@ public boolean equals(Object o) { return Objects.equals(removedFragmentIds, that.removedFragmentIds) && Objects.equals(updatedFragments, that.updatedFragments) && Objects.equals(newFragments, that.newFragments) - && Arrays.equals(fieldsModified, that.fieldsModified) - && Arrays.equals(fieldsForPreservingFragBitmap, that.fieldsForPreservingFragBitmap) + && Arrays.equals(bitmapPruneFieldIds, that.bitmapPruneFieldIds) + && Arrays.equals(bitmapPreserveExcludeFieldIds, that.bitmapPreserveExcludeFieldIds) && Objects.equals(updateMode, that.updateMode); } @@ -112,8 +112,8 @@ public static class Builder { private List removedFragmentIds = Collections.emptyList(); private List updatedFragments = Collections.emptyList(); private List newFragments = Collections.emptyList(); - private long[] fieldsModified = new long[0]; - private long[] fieldsForPreservingFragBitmap = new long[0]; + private long[] bitmapPruneFieldIds = new long[0]; + private long[] bitmapPreserveExcludeFieldIds = new long[0]; private Optional updateMode = Optional.empty(); private Builder() {} @@ -133,13 +133,13 @@ public Builder newFragments(List newFragments) { return this; } - public Builder fieldsModified(long[] fieldsModified) { - this.fieldsModified = fieldsModified; + public Builder bitmapPruneFieldIds(long[] bitmapPruneFieldIds) { + this.bitmapPruneFieldIds = bitmapPruneFieldIds; return this; } - public Builder fieldsForPreservingFragBitmap(long[] fieldsForPreservingFragBitmap) { - this.fieldsForPreservingFragBitmap = fieldsForPreservingFragBitmap; + public Builder bitmapPreserveExcludeFieldIds(long[] bitmapPreserveExcludeFieldIds) { + this.bitmapPreserveExcludeFieldIds = bitmapPreserveExcludeFieldIds; return this; } @@ -153,8 +153,8 @@ public Update build() { removedFragmentIds, updatedFragments, newFragments, - fieldsModified, - fieldsForPreservingFragBitmap, + bitmapPruneFieldIds, + bitmapPreserveExcludeFieldIds, updateMode); } } diff --git a/java/src/test/java/com/lancedb/lance/operation/UpdateTest.java b/java/src/test/java/com/lancedb/lance/operation/UpdateTest.java index 63cb58ebaa0..da5ea8f9b6f 100644 --- a/java/src/test/java/com/lancedb/lance/operation/UpdateTest.java +++ b/java/src/test/java/com/lancedb/lance/operation/UpdateTest.java @@ -152,7 +152,7 @@ void testUpdateColumns(@TempDir Path tempDir) throws Exception { Update.builder() .updatedFragments( Collections.singletonList(updateResult.getUpdatedFragment())) - .fieldsModified(updateResult.getFieldsModified()) + .bitmapPruneFieldIds(updateResult.getBitmapPruneFieldIds()) .build()) .build(); try (Dataset dataset = updateTransaction.commit()) { diff --git a/protos/transaction.proto b/protos/transaction.proto index 186847d52b5..d2c39f9bd00 100644 --- a/protos/transaction.proto +++ b/protos/transaction.proto @@ -193,12 +193,13 @@ message Transaction { repeated DataFragment updated_fragments = 2; // The new fragments where updated rows have been moved to. repeated DataFragment new_fragments = 3; - // The ids of the fields that have been modified. + /// Field IDs that drive index fragment bitmap pruning repeated uint32 fields_modified = 4; /// The MemWAL (pre-image) that should be marked as merged after this transaction MemWalIndexDetails.MemWal mem_wal_to_merge = 5; - /// The fields that used to judge whether to preserve the new frag's id into - /// the frag bitmap of the specified indices. + /// Field IDs used to decide whether to preserve new fragment IDs in the + /// fragment bitmap of specified indices. Indices that do not cover these + /// fields may preserve the new fragment IDs when applicable. repeated uint32 fields_for_preserving_frag_bitmap = 6; // The mode of update UpdateMode update_mode = 7; diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index 5b6a4064d29..595a828bd24 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -3923,13 +3923,16 @@ class Update(BaseOperation): The fragments that have been updated with new deletion vectors. new_fragments: list[FragmentMetadata] The fragments that contain the new rows. - fields_modified: list[int] - If any fields are modified in updated_fragments, then they must be - listed here so those fragments can be removed from indices covering - those fields. - fields_for_preserving_frag_bitmap: list[int] - The fields that used to judge whether to preserve the new frag's id into - the frag bitmap of the specified indices. + bitmap_prune_field_ids: list[int] + Field IDs that drive index fragment bitmap pruning. If any fields are + modified in updated_fragments, + then they must be listed here so those fragments can be removed from + indices that cover any of these fields. + bitmap_preserve_field_ids: list[int] + Field IDs used to decide whether to preserve new fragment IDs in an index's + fragment bitmap. + Indices that do not cover these fields may preserve the new fragment IDs + when applicable. """ removed_fragment_ids: List[int] = dataclasses.field(default_factory=list) @@ -3937,10 +3940,8 @@ class Update(BaseOperation): default_factory=list ) new_fragments: List[FragmentMetadata] = dataclasses.field(default_factory=list) - fields_modified: List[int] = dataclasses.field(default_factory=list) - fields_for_preserving_frag_bitmap: List[int] = dataclasses.field( - default_factory=list - ) + bitmap_prune_field_ids: List[int] = dataclasses.field(default_factory=list) + bitmap_preserve_field_ids: List[int] = dataclasses.field(default_factory=list) update_mode: str = "" def __post_init__(self): diff --git a/python/python/lance/fragment.py b/python/python/lance/fragment.py index d08167fcb52..48f8b833c55 100644 --- a/python/python/lance/fragment.py +++ b/python/python/lance/fragment.py @@ -680,7 +680,7 @@ def update_columns( ... }) >>> # Update the fragment >>> fragment = dataset.get_fragment(0) - >>> updated_fragment, fields_modified = fragment.update_columns( + >>> updated_fragment, bitmap_prune_field_ids = fragment.update_columns( ... update_data, ... left_on="id", ... right_on="id" @@ -689,7 +689,7 @@ def update_columns( >>> from lance import LanceOperation >>> op = LanceOperation.Update( ... updated_fragments=[updated_fragment], - ... fields_modified=fields_modified, + ... bitmap_prune_field_ids=bitmap_prune_field_ids, ... ) >>> dataset = lance.LanceDataset.commit( ... "dataset", diff --git a/python/python/tests/test_fragment.py b/python/python/tests/test_fragment.py index 40e2a69df22..fef7c1c7eff 100644 --- a/python/python/tests/test_fragment.py +++ b/python/python/tests/test_fragment.py @@ -540,17 +540,17 @@ def test_fragment_update_columns_basic(tmp_path): # Get the fragment and update columns fragment = dataset.get_fragment(0) - updated_fragment, fields_modified = fragment.update_columns(update_data) + updated_fragment, bitmap_prune_field_ids = fragment.update_columns(update_data) - # Verify fields_modified is returned - assert isinstance(fields_modified, list) - assert len(fields_modified) > 0 + # Verify bitmap_prune_field_ids is returned + assert isinstance(bitmap_prune_field_ids, list) + assert len(bitmap_prune_field_ids) > 0 # Commit the changes using Update operation op = LanceOperation.Update( updated_fragments=[updated_fragment], - fields_modified=fields_modified, + bitmap_prune_field_ids=bitmap_prune_field_ids, ) updated_dataset = lance.LanceDataset.commit( str(dataset_uri), op, read_version=dataset.version @@ -588,7 +588,7 @@ def test_fragment_update_columns_with_custom_join_key(tmp_path): # Get the fragment and update columns fragment = dataset.get_fragment(0) - updated_fragment, fields_modified = fragment.update_columns( + updated_fragment, bitmap_prune_field_ids = fragment.update_columns( update_data, left_on="id", right_on="id" ) @@ -596,7 +596,7 @@ def test_fragment_update_columns_with_custom_join_key(tmp_path): op = LanceOperation.Update( updated_fragments=[updated_fragment], - fields_modified=fields_modified, + bitmap_prune_field_ids=bitmap_prune_field_ids, ) updated_dataset = lance.LanceDataset.commit( str(dataset_uri), op, read_version=dataset.version @@ -633,13 +633,13 @@ def test_fragment_update_columns_with_nulls(tmp_path): # Get the fragment and update columns fragment = dataset.get_fragment(0) - updated_fragment, fields_modified = fragment.update_columns(update_data) + updated_fragment, bitmap_prune_field_ids = fragment.update_columns(update_data) # Commit the changes op = LanceOperation.Update( updated_fragments=[updated_fragment], - fields_modified=fields_modified, + bitmap_prune_field_ids=bitmap_prune_field_ids, ) updated_dataset = lance.LanceDataset.commit( str(dataset_uri), op, read_version=dataset.version @@ -674,13 +674,13 @@ def test_fragment_update_columns_partial_update(tmp_path): # Get the fragment and update columns fragment = dataset.get_fragment(0) - updated_fragment, fields_modified = fragment.update_columns(update_data) + updated_fragment, bitmap_prune_field_ids = fragment.update_columns(update_data) # Commit the changes op = LanceOperation.Update( updated_fragments=[updated_fragment], - fields_modified=fields_modified, + bitmap_prune_field_ids=bitmap_prune_field_ids, ) updated_dataset = lance.LanceDataset.commit( str(dataset_uri), op, read_version=dataset.version @@ -717,13 +717,13 @@ def test_fragment_update_columns_no_match(tmp_path): # Get the fragment and update columns fragment = dataset.get_fragment(0) - updated_fragment, fields_modified = fragment.update_columns(update_data) + updated_fragment, bitmap_prune_field_ids = fragment.update_columns(update_data) # Commit the changes op = LanceOperation.Update( updated_fragments=[updated_fragment], - fields_modified=fields_modified, + bitmap_prune_field_ids=bitmap_prune_field_ids, ) updated_dataset = lance.LanceDataset.commit( str(dataset_uri), op, read_version=dataset.version diff --git a/python/src/transaction.rs b/python/src/transaction.rs index 0aa19669986..4c729af87ac 100644 --- a/python/src/transaction.rs +++ b/python/src/transaction.rs @@ -211,10 +211,10 @@ impl FromPyObject<'_> for PyLance { let new_fragments = extract_vec(&ob.getattr("new_fragments")?)?; - let fields_modified = ob.getattr("fields_modified")?.extract()?; + let bitmap_prune_field_ids = ob.getattr("bitmap_prune_field_ids")?.extract()?; - let fields_for_preserving_frag_bitmap = ob - .getattr("fields_for_preserving_frag_bitmap")? + let bitmap_preserve_field_ids = ob + .getattr("bitmap_preserve_field_ids")? .extract() .unwrap_or_default(); @@ -228,9 +228,9 @@ impl FromPyObject<'_> for PyLance { removed_fragment_ids, updated_fragments, new_fragments, - fields_modified, + bitmap_prune_field_ids, mem_wal_to_merge: None, - fields_for_preserving_frag_bitmap, + bitmap_preserve_field_ids, update_mode, }; Ok(Self(op)) @@ -367,17 +367,16 @@ impl<'py> IntoPyObject<'py> for PyLance<&Operation> { removed_fragment_ids, updated_fragments, new_fragments, - fields_modified, - fields_for_preserving_frag_bitmap, + bitmap_prune_field_ids, + bitmap_preserve_field_ids, update_mode, .. } => { let removed_fragment_ids = removed_fragment_ids.into_pyobject(py)?; let updated_fragments = export_vec(py, updated_fragments.as_slice())?; let new_fragments = export_vec(py, new_fragments.as_slice())?; - let fields_modified = fields_modified.into_pyobject(py)?; - let fields_for_preserving_frag_bitmap = - fields_for_preserving_frag_bitmap.into_pyobject(py)?; + let bitmap_prune_field_ids = bitmap_prune_field_ids.into_pyobject(py)?; + let bitmap_preserve_field_ids = bitmap_preserve_field_ids.into_pyobject(py)?; let update_mode = match update_mode { Some(mode) => match mode { lance::dataset::transaction::UpdateMode::RewriteRows => "rewrite_rows", @@ -394,8 +393,8 @@ impl<'py> IntoPyObject<'py> for PyLance<&Operation> { removed_fragment_ids, updated_fragments, new_fragments, - fields_modified, - fields_for_preserving_frag_bitmap, + bitmap_prune_field_ids, + bitmap_preserve_field_ids, update_mode, )) } diff --git a/rust/lance/src/dataset/fragment.rs b/rust/lance/src/dataset/fragment.rs index 1544bc3583a..14ac46f437f 100644 --- a/rust/lance/src/dataset/fragment.rs +++ b/rust/lance/src/dataset/fragment.rs @@ -2829,9 +2829,9 @@ mod tests { removed_fragment_ids: vec![], updated_fragments: vec![updated_fragment1], new_fragments: vec![], - fields_modified: fields_modified1, + bitmap_prune_field_ids: fields_modified1, mem_wal_to_merge: None, - fields_for_preserving_frag_bitmap: vec![], + bitmap_preserve_field_ids: vec![], update_mode: Some(UpdateMode::RewriteColumns), }; let mut dataset1 = Dataset::commit( @@ -2901,9 +2901,9 @@ mod tests { removed_fragment_ids: vec![], updated_fragments: vec![updated_fragment2], new_fragments: vec![], - fields_modified: fields_modified2, + bitmap_prune_field_ids: fields_modified2, mem_wal_to_merge: None, - fields_for_preserving_frag_bitmap: vec![], + bitmap_preserve_field_ids: vec![], update_mode: Some(UpdateMode::RewriteColumns), }; let dataset2 = Dataset::commit( diff --git a/rust/lance/src/dataset/transaction.rs b/rust/lance/src/dataset/transaction.rs index f22c7426b19..3ebb845e811 100644 --- a/rust/lance/src/dataset/transaction.rs +++ b/rust/lance/src/dataset/transaction.rs @@ -244,9 +244,10 @@ pub enum Operation { /// /// A horizontal update adds new columns. In this case, the updated fragments /// may have fields removed or added. It is even possible for a field to be tombstoned - /// and then added back in the same update. (which is a field modification). If any - /// fields are modified in this way then they need to be added to the fields_modified list. - /// This way we can correctly update the indices. + /// and then added back in the same update. (which is a field modification). + /// Any fields modified in this way must be added to the bitmap_prune_field_ids list so + /// indices that cover those fields can prune fragment bitmaps accordingly. + /// This ensures indices remain consistent after updates. /// This is what is used by a merge insert that does not match the whole schema. Update { /// Ids of fragments that have been moved @@ -255,13 +256,14 @@ pub enum Operation { updated_fragments: Vec, /// Fragments that have been added new_fragments: Vec, - /// The fields that have been modified - fields_modified: Vec, + /// Field IDs that drive index fragment bitmap pruning + bitmap_prune_field_ids: Vec, /// The MemWAL (pre-image) that should be marked as merged after this transaction mem_wal_to_merge: Option, - /// The fields that used to judge whether to preserve the new frag's id into - /// the frag bitmap of the specified indices. - fields_for_preserving_frag_bitmap: Vec, + /// Field IDs used to decide whether to preserve new fragment IDs in the + /// fragment bitmap of specified indices. Indices that do not cover these + /// fields may preserve the new fragment IDs when applicable. + bitmap_preserve_field_ids: Vec, /// The mode of update update_mode: Option, }, @@ -449,29 +451,29 @@ impl PartialEq for Operation { removed_fragment_ids: a_removed, updated_fragments: a_updated, new_fragments: a_new, - fields_modified: a_fields, + bitmap_prune_field_ids: a_prune_fields, mem_wal_to_merge: a_mem_wal_to_merge, - fields_for_preserving_frag_bitmap: a_fields_for_preserving_frag_bitmap, + bitmap_preserve_field_ids: a_bitmap_preserve_exclude_fields, update_mode: a_update_mode, }, Self::Update { removed_fragment_ids: b_removed, updated_fragments: b_updated, new_fragments: b_new, - fields_modified: b_fields, + bitmap_prune_field_ids: b_prune_fields, mem_wal_to_merge: b_mem_wal_to_merge, - fields_for_preserving_frag_bitmap: b_fields_for_preserving_frag_bitmap, + bitmap_preserve_field_ids: b_bitmap_preserve_exclude_fields, update_mode: b_update_mode, }, ) => { compare_vec(a_removed, b_removed) && compare_vec(a_updated, b_updated) && compare_vec(a_new, b_new) - && compare_vec(a_fields, b_fields) + && compare_vec(a_prune_fields, b_prune_fields) && a_mem_wal_to_merge == b_mem_wal_to_merge && compare_vec( - a_fields_for_preserving_frag_bitmap, - b_fields_for_preserving_frag_bitmap, + a_bitmap_preserve_exclude_fields, + b_bitmap_preserve_exclude_fields, ) && a_update_mode == b_update_mode } @@ -1722,9 +1724,9 @@ impl Transaction { removed_fragment_ids, updated_fragments, new_fragments, - fields_modified, + bitmap_prune_field_ids, mem_wal_to_merge, - fields_for_preserving_frag_bitmap, + bitmap_preserve_field_ids, update_mode, } => { // Extract existing fragments once for reuse @@ -1759,7 +1761,7 @@ impl Transaction { Self::prune_updated_fields_from_indices( &mut final_indices, updated_fragments, - fields_modified, + bitmap_prune_field_ids, ); let mut new_fragments = @@ -1918,7 +1920,7 @@ impl Transaction { &mut final_indices, &pure_updated_frag_ids, &original_fragment_ids, - fields_for_preserving_frag_bitmap, + bitmap_preserve_field_ids, ); } @@ -2323,15 +2325,13 @@ impl Transaction { indices: &mut [IndexMetadata], pure_update_frag_ids: &[u64], original_fragment_ids: &[u64], - fields_for_preserving_frag_bitmap: &[u32], + bitmap_preserve_field_ids: &[u32], ) { if pure_update_frag_ids.is_empty() { return; } - let value_updated_field_set = fields_for_preserving_frag_bitmap - .iter() - .collect::>(); + let value_updated_field_set = bitmap_preserve_field_ids.iter().collect::>(); for index in indices.iter_mut() { let index_covers_modified_field = index.fields.iter().any(|field_id| { @@ -2363,20 +2363,20 @@ impl Transaction { fn prune_updated_fields_from_indices( indices: &mut [IndexMetadata], updated_fragments: &[Fragment], - fields_modified: &[u32], + bitmap_prune_field_ids: &[u32], ) { - if fields_modified.is_empty() { + if bitmap_prune_field_ids.is_empty() { return; } // If we modified any fields in the fragments then we need to remove those fragments // from the index if the index covers one of those modified fields. - let fields_modified_set = fields_modified.iter().collect::>(); + let fields_set_for_pruning = bitmap_prune_field_ids.iter().collect::>(); for index in indices.iter_mut() { if index .fields .iter() - .any(|field_id| fields_modified_set.contains(&u32::try_from(*field_id).unwrap())) + .any(|field_id| fields_set_for_pruning.contains(&u32::try_from(*field_id).unwrap())) { if let Some(fragment_bitmap) = &mut index.fragment_bitmap { for fragment_id in updated_fragments.iter().map(|f| f.id as u32) { @@ -2897,9 +2897,9 @@ impl TryFrom for Transaction { removed_fragment_ids, updated_fragments, new_fragments, - fields_modified, + fields_modified: bitmap_prune_field_ids, mem_wal_to_merge, - fields_for_preserving_frag_bitmap, + fields_for_preserving_frag_bitmap: bitmap_preserve_field_ids, update_mode, })) => Operation::Update { removed_fragment_ids, @@ -2911,9 +2911,9 @@ impl TryFrom for Transaction { .into_iter() .map(Fragment::try_from) .collect::>>()?, - fields_modified, + bitmap_prune_field_ids, mem_wal_to_merge: mem_wal_to_merge.map(|m| MemWal::try_from(m).unwrap()), - fields_for_preserving_frag_bitmap, + bitmap_preserve_field_ids, update_mode: match update_mode { 0 => Some(UpdateMode::RewriteRows), 1 => Some(UpdateMode::RewriteColumns), @@ -3266,9 +3266,9 @@ impl From<&Transaction> for pb::Transaction { removed_fragment_ids, updated_fragments, new_fragments, - fields_modified, + bitmap_prune_field_ids, mem_wal_to_merge, - fields_for_preserving_frag_bitmap, + bitmap_preserve_field_ids, update_mode, } => pb::transaction::Operation::Update(pb::transaction::Update { removed_fragment_ids: removed_fragment_ids.clone(), @@ -3277,9 +3277,9 @@ impl From<&Transaction> for pb::Transaction { .map(pb::DataFragment::from) .collect(), new_fragments: new_fragments.iter().map(pb::DataFragment::from).collect(), - fields_modified: fields_modified.clone(), + fields_modified: bitmap_prune_field_ids.clone(), mem_wal_to_merge: mem_wal_to_merge.as_ref().map(|m| m.into()), - fields_for_preserving_frag_bitmap: fields_for_preserving_frag_bitmap.clone(), + fields_for_preserving_frag_bitmap: bitmap_preserve_field_ids.clone(), update_mode: update_mode .as_ref() .map(|mode| match mode { diff --git a/rust/lance/src/dataset/write/commit.rs b/rust/lance/src/dataset/write/commit.rs index 15c1e0e8e31..48df3283544 100644 --- a/rust/lance/src/dataset/write/commit.rs +++ b/rust/lance/src/dataset/write/commit.rs @@ -788,9 +788,9 @@ mod tests { updated_fragments: vec![], new_fragments: vec![], removed_fragment_ids: vec![], - fields_modified: vec![], + bitmap_prune_field_ids: vec![], mem_wal_to_merge: None, - fields_for_preserving_frag_bitmap: vec![], + bitmap_preserve_field_ids: vec![], update_mode: None, }, read_version: 1, diff --git a/rust/lance/src/dataset/write/merge_insert.rs b/rust/lance/src/dataset/write/merge_insert.rs index 340e60cdff9..4796a5ee32b 100644 --- a/rust/lance/src/dataset/write/merge_insert.rs +++ b/rust/lance/src/dataset/write/merge_insert.rs @@ -1467,20 +1467,21 @@ impl MergeInsertJob { // We will have a different commit path here too, as we are modifying // fragments rather than writing new ones - let (updated_fragments, new_fragments, fields_modified) = Self::update_fragments( - self.dataset.clone(), - Box::pin(stream), - self.dataset.manifest.version + 1, - ) - .await?; + let (updated_fragments, new_fragments, bitmap_prune_field_ids) = + Self::update_fragments( + self.dataset.clone(), + Box::pin(stream), + self.dataset.manifest.version + 1, + ) + .await?; let operation = Operation::Update { removed_fragment_ids: Vec::new(), updated_fragments, new_fragments, - fields_modified, + bitmap_prune_field_ids, mem_wal_to_merge: self.params.mem_wal_to_merge, - fields_for_preserving_frag_bitmap: vec![], // in-place update do not affect preserving frag bitmap + bitmap_preserve_field_ids: vec![], // in-place update does not affect preserving fragment bitmap update_mode: Some(RewriteColumns), }; // We have rewritten the fragments, not just the deletion files, so @@ -1551,13 +1552,9 @@ impl MergeInsertJob { new_fragments, // On this path we only make deletions against updated_fragments and will not // modify any field values. - fields_modified: vec![], + bitmap_prune_field_ids: vec![], mem_wal_to_merge: self.params.mem_wal_to_merge, - fields_for_preserving_frag_bitmap: full_schema - .fields - .iter() - .map(|f| f.id as u32) - .collect(), + bitmap_preserve_field_ids: full_schema.fields.iter().map(|f| f.id as u32).collect(), update_mode: Some(RewriteRows), }; diff --git a/rust/lance/src/dataset/write/merge_insert/exec/write.rs b/rust/lance/src/dataset/write/merge_insert/exec/write.rs index 0df589ce71a..0b0369349f2 100644 --- a/rust/lance/src/dataset/write/merge_insert/exec/write.rs +++ b/rust/lance/src/dataset/write/merge_insert/exec/write.rs @@ -882,9 +882,9 @@ impl ExecutionPlan for FullSchemaMergeInsertExec { removed_fragment_ids, updated_fragments, new_fragments, - fields_modified: vec![], // No fields are modified in schema for upsert + bitmap_prune_field_ids: vec![], mem_wal_to_merge, - fields_for_preserving_frag_bitmap: dataset + bitmap_preserve_field_ids: dataset .schema() .fields .iter() diff --git a/rust/lance/src/dataset/write/update.rs b/rust/lance/src/dataset/write/update.rs index 5d40570ac91..4d430266944 100644 --- a/rust/lance/src/dataset/write/update.rs +++ b/rust/lance/src/dataset/write/update.rs @@ -393,10 +393,10 @@ impl UpdateJob { dataset: Arc, update_data: UpdateData, ) -> Result { - let mut fields_for_preserving_frag_bitmap = Vec::new(); + let mut bitmap_preserve_field_ids = Vec::new(); for column_name in self.updates.keys() { if let Ok(field_id) = dataset.schema().field_id(column_name) { - fields_for_preserving_frag_bitmap.push(field_id as u32); + bitmap_preserve_field_ids.push(field_id as u32); } } @@ -405,12 +405,11 @@ impl UpdateJob { removed_fragment_ids: update_data.removed_fragment_ids, updated_fragments: update_data.old_fragments, new_fragments: update_data.new_fragments, - // In "rewrite rows" mode, the rows that are updated in the fragment - // are moved(deleted and appended). - // so we do not need to handle the frag bitmap of the index about it. - fields_modified: vec![], + // In "rewrite rows" mode, rows updated in the fragment are moved (deleted and appended). + // Therefore we do not need to prune index fragment bitmaps based on updated values here. + bitmap_prune_field_ids: vec![], mem_wal_to_merge: None, - fields_for_preserving_frag_bitmap, + bitmap_preserve_field_ids, update_mode: Some(RewriteRows), }; diff --git a/rust/lance/src/io/commit/conflict_resolver.rs b/rust/lance/src/io/commit/conflict_resolver.rs index 84110c69249..5bacb167b0a 100644 --- a/rust/lance/src/io/commit/conflict_resolver.rs +++ b/rust/lance/src/io/commit/conflict_resolver.rs @@ -1757,9 +1757,9 @@ mod tests { updated_fragments: vec![Fragment::new(0)], removed_fragment_ids: vec![], new_fragments: vec![], - fields_modified: vec![], + bitmap_prune_field_ids: vec![], mem_wal_to_merge: None, - fields_for_preserving_frag_bitmap: vec![], + bitmap_preserve_field_ids: vec![], update_mode: None, }; let transaction = Transaction::new_from_version(1, operation); @@ -1768,9 +1768,9 @@ mod tests { updated_fragments: vec![Fragment::new(1)], removed_fragment_ids: vec![2], new_fragments: vec![], - fields_modified: vec![], + bitmap_prune_field_ids: vec![], mem_wal_to_merge: None, - fields_for_preserving_frag_bitmap: vec![], + bitmap_preserve_field_ids: vec![], update_mode: None, }, Operation::Delete { @@ -1782,9 +1782,9 @@ mod tests { removed_fragment_ids: vec![], updated_fragments: vec![Fragment::new(4)], new_fragments: vec![], - fields_modified: vec![], + bitmap_prune_field_ids: vec![], mem_wal_to_merge: None, - fields_for_preserving_frag_bitmap: vec![], + bitmap_preserve_field_ids: vec![], update_mode: None, }, ]; @@ -1883,9 +1883,9 @@ mod tests { updated_fragments: vec![apply_deletion(&[0], &mut fragment, &dataset).await], removed_fragment_ids: vec![], new_fragments: vec![sample_file.clone()], - fields_modified: vec![], + bitmap_prune_field_ids: vec![], mem_wal_to_merge: None, - fields_for_preserving_frag_bitmap: vec![], + bitmap_preserve_field_ids: vec![], update_mode: None, }, Operation::Delete { @@ -1897,9 +1897,9 @@ mod tests { updated_fragments: vec![apply_deletion(&[2], &mut fragment, &dataset).await], removed_fragment_ids: vec![], new_fragments: vec![sample_file], - fields_modified: vec![], + bitmap_prune_field_ids: vec![], mem_wal_to_merge: None, - fields_for_preserving_frag_bitmap: vec![], + bitmap_preserve_field_ids: vec![], update_mode: None, }, ]; @@ -2018,9 +2018,9 @@ mod tests { updated_fragments: vec![], removed_fragment_ids: vec![0], new_fragments: vec![sample_file.clone()], - fields_modified: vec![], + bitmap_prune_field_ids: vec![], mem_wal_to_merge: None, - fields_for_preserving_frag_bitmap: vec![], + bitmap_preserve_field_ids: vec![], update_mode: None, }, ), @@ -2030,9 +2030,9 @@ mod tests { updated_fragments: vec![apply_deletion(&[0], &mut fragment, &dataset).await], removed_fragment_ids: vec![], new_fragments: vec![sample_file.clone()], - fields_modified: vec![], + bitmap_prune_field_ids: vec![], mem_wal_to_merge: None, - fields_for_preserving_frag_bitmap: vec![], + bitmap_preserve_field_ids: vec![], update_mode: None, }, ), @@ -2188,9 +2188,9 @@ mod tests { removed_fragment_ids: vec![1], updated_fragments: vec![fragment0.clone()], new_fragments: vec![fragment2.clone()], - fields_modified: vec![0], + bitmap_prune_field_ids: vec![0], mem_wal_to_merge: None, - fields_for_preserving_frag_bitmap: vec![], + bitmap_preserve_field_ids: vec![], update_mode: None, }, create_update_config_for_test( @@ -2383,9 +2383,9 @@ mod tests { updated_fragments: vec![fragment0], removed_fragment_ids: vec![], new_fragments: vec![fragment2], - fields_modified: vec![0], + bitmap_prune_field_ids: vec![0], mem_wal_to_merge: None, - fields_for_preserving_frag_bitmap: vec![], + bitmap_preserve_field_ids: vec![], update_mode: None, }, [ @@ -2805,9 +2805,9 @@ mod tests { updated_fragments: vec![Fragment::new(0)], removed_fragment_ids: vec![], new_fragments: vec![], - fields_modified: vec![], + bitmap_prune_field_ids: vec![], mem_wal_to_merge: None, - fields_for_preserving_frag_bitmap: vec![], + bitmap_preserve_field_ids: vec![], update_mode: None, }, ];