Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions java/lance-jni/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,8 @@ fn convert_to_java_operation_inner<'local>(
new_fragments,
fields_modified: _,
mem_wal_to_merge: _,
fields_for_preserving_frag_bitmap: _,
update_mode: _,
} => {
let removed_ids: Vec<JLance<i64>> = removed_fragment_ids
.iter()
Expand Down Expand Up @@ -887,6 +889,8 @@ fn convert_to_rust_operation(
new_fragments,
fields_modified: vec![],
mem_wal_to_merge: None,
update_mode: None,
fields_for_preserving_frag_bitmap: vec![],
}
}
"DataReplacement" => {
Expand Down
19 changes: 19 additions & 0 deletions protos/transaction.proto
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,25 @@ message Transaction {
repeated uint32 fields_modified = 4;
/// The MemWAL (pre-image) that should be marked as merged after this transaction
MemWalIndexDetails.MemWal mem_wal_to_merge = 5;
/// The fields that used to judge whether to preserve the new frag's id into
/// the frag bitmap of the specified indices.
repeated uint32 fields_for_preserving_frag_bitmap = 6;
// The mode of update
UpdateMode update_mode = 7;
}

// The mode of update operation
enum UpdateMode {
Comment thread
yanghua marked this conversation as resolved.

/// rows are deleted in current fragments and rewritten in new fragments.
/// This is most optimal when the majority of columns are being rewritten
/// or only a few rows are being updated.
REWRITE_ROWS = 0;

/// within each fragment, columns are fully rewritten and inserted as new data files.
/// Old versions of columns are tombstoned. This is most optimal when most rows are affected
/// but a small subset of columns are affected.
REWRITE_COLUMNS = 1;
}

// An operation that updates the table config.
Expand Down
5 changes: 5 additions & 0 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -3632,12 +3632,17 @@ class Update(BaseOperation):
If any fields are modified in updated_fragments, then they must be
listed here so those fragments can be removed from indices covering
those fields.
fields_for_preserving_frag_bitmap: list[int]
The fields that used to judge whether to preserve the new frag's id into
the frag bitmap of the specified indices.
"""

removed_fragment_ids: List[int]
updated_fragments: List[FragmentMetadata]
new_fragments: List[FragmentMetadata]
fields_modified: List[int]
fields_for_preserving_frag_bitmap: List[int]
update_mode: str

def __post_init__(self):
LanceOperation._validate_fragments(self.updated_fragments)
Expand Down
52 changes: 48 additions & 4 deletions python/src/transaction.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

use crate::schema::LanceSchema;
use crate::utils::{class_name, export_vec, extract_vec, PyLance};
use arrow::pyarrow::PyArrowType;
use arrow_schema::Schema as ArrowSchema;
use lance::dataset::transaction::{
DataReplacementGroup, Operation, RewriteGroup, RewrittenIndex, Transaction,
DataReplacementGroup, Operation, RewriteGroup, RewrittenIndex, Transaction, UpdateMode,
};
use lance::datatypes::Schema;
use lance_table::format::{DataFile, Fragment, Index};
Expand All @@ -17,9 +19,6 @@ use std::collections::HashMap;
use std::sync::Arc;
use uuid::Uuid;

use crate::schema::LanceSchema;
use crate::utils::{class_name, export_vec, extract_vec, PyLance};

// Add Index bindings
impl FromPyObject<'_> for PyLance<Index> {
fn extract_bound(ob: &Bound<'_, PyAny>) -> PyResult<Self> {
Expand Down Expand Up @@ -141,6 +140,23 @@ impl<'py> IntoPyObject<'py> for PyLance<&DataReplacementGroup> {
}
}

#[derive(Debug, Clone)]
pub struct PyUpdateMode(pub UpdateMode);

impl FromPyObject<'_> for PyUpdateMode {
fn extract_bound(ob: &Bound<'_, PyAny>) -> PyResult<Self> {
let mode_str: String = ob.extract()?;
match mode_str.as_str() {
"rewrite_rows" => Ok(Self(UpdateMode::RewriteRows)),
"rewrite_columns" => Ok(Self(UpdateMode::RewriteColumns)),
_ => Err(PyValueError::new_err(format!(
"Invalid UpdateMode: {}. Valid options are: rewrite_rows, rewrite_columns",
mode_str
))),
}
}
}

impl FromPyObject<'_> for PyLance<Operation> {
fn extract_bound(ob: &Bound<'_, PyAny>) -> PyResult<Self> {
match class_name(ob)?.as_str() {
Expand Down Expand Up @@ -182,12 +198,25 @@ impl FromPyObject<'_> for PyLance<Operation> {

let fields_modified = ob.getattr("fields_modified")?.extract()?;

let fields_for_preserving_frag_bitmap = ob
.getattr("fields_for_preserving_frag_bitmap")?
.extract()
.unwrap_or_default();

let update_mode = ob
.getattr("update_mode")?
.extract::<PyUpdateMode>()
.ok()
.map(|py_mode| py_mode.0);

let op = Operation::Update {
removed_fragment_ids,
updated_fragments,
new_fragments,
fields_modified,
mem_wal_to_merge: None,
fields_for_preserving_frag_bitmap,
update_mode,
};
Ok(Self(op))
}
Expand Down Expand Up @@ -290,12 +319,25 @@ impl<'py> IntoPyObject<'py> for PyLance<&Operation> {
updated_fragments,
new_fragments,
fields_modified,
fields_for_preserving_frag_bitmap,
update_mode,
..
} => {
let removed_fragment_ids = removed_fragment_ids.into_pyobject(py)?;
let updated_fragments = export_vec(py, updated_fragments.as_slice())?;
let new_fragments = export_vec(py, new_fragments.as_slice())?;
let fields_modified = fields_modified.into_pyobject(py)?;
let fields_for_preserving_frag_bitmap =
fields_for_preserving_frag_bitmap.into_pyobject(py)?;
let update_mode = match update_mode {
Some(mode) => match mode {
lance::dataset::transaction::UpdateMode::RewriteRows => "rewrite_rows",
lance::dataset::transaction::UpdateMode::RewriteColumns => {
"rewrite_columns"
}
},
None => "rewrite_rows",
};
let cls = namespace
.getattr("Update")
.expect("Failed to get Update class");
Expand All @@ -304,6 +346,8 @@ impl<'py> IntoPyObject<'py> for PyLance<&Operation> {
updated_fragments,
new_fragments,
fields_modified,
fields_for_preserving_frag_bitmap,
update_mode,
))
}
Operation::DataReplacement { replacements } => {
Expand Down
Loading
Loading