diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index adf923f0c71..e45f9d6b53d 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -156,7 +156,7 @@ jobs: ALL_FEATURES=`cargo metadata --format-version=1 --no-deps | jq -r '.packages[] | .features | keys | .[]' | grep -v protoc | sort | uniq | paste -s -d "," -` cargo test --locked --features ${ALL_FEATURES} build-no-lock: - runs-on: ubuntu-24.04 + runs-on: warp-ubuntu-2404-x64-8x timeout-minutes: 30 env: # Need up-to-date compilers for kernels diff --git a/java/lance-jni/src/blocking_dataset.rs b/java/lance-jni/src/blocking_dataset.rs index e2c73524fd0..52d63bc2fab 100644 --- a/java/lance-jni/src/blocking_dataset.rs +++ b/java/lance-jni/src/blocking_dataset.rs @@ -564,6 +564,7 @@ pub fn inner_commit_overwrite<'local>( fragments, schema, config_upsert_values: None, + initial_bases: None, }; let path_str = path.extract(env)?; let read_version = env.get_u64_opt(&read_version_obj)?; diff --git a/java/lance-jni/src/transaction.rs b/java/lance-jni/src/transaction.rs index e73dbe40b44..f9ce719f369 100644 --- a/java/lance-jni/src/transaction.rs +++ b/java/lance-jni/src/transaction.rs @@ -472,6 +472,7 @@ fn convert_to_java_operation_inner<'local>( fragments: rust_fragments, schema, config_upsert_values, + initial_bases: _, } => { let java_fragments = export_vec(env, &rust_fragments)?; let java_schema = convert_to_java_schema(env, schema)?; @@ -890,6 +891,7 @@ fn convert_to_rust_operation( fragments, schema, config_upsert_values, + initial_bases: None, } } "Rewrite" => { diff --git a/protos/transaction.proto b/protos/transaction.proto index a817835be29..5840f3180ad 100644 --- a/protos/transaction.proto +++ b/protos/transaction.proto @@ -68,6 +68,8 @@ message Transaction { map schema_metadata = 3; // Key-value pairs to merge with existing config. map config_upsert_values = 4; + // The base paths to be added for the initial dataset creation + repeated BasePath initial_bases = 5; } // Add or replace a new secondary index. diff --git a/python/python/lance/__init__.py b/python/python/lance/__init__.py index 9bc9c8ccd77..ca94f80bf52 100644 --- a/python/python/lance/__init__.py +++ b/python/python/lance/__init__.py @@ -26,6 +26,7 @@ ) from .fragment import FragmentMetadata, LanceFragment from .lance import ( + DatasetBasePath, FFILanceTableProvider, ScanStatistics, bytes_read_counter, @@ -47,6 +48,7 @@ __all__ = [ "BlobColumn", "BlobFile", + "DatasetBasePath", "DataStatistics", "FieldStatistics", "FragmentMetadata", diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index ad4f66e3b7e..97fb3c5b5ee 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -52,6 +52,7 @@ CleanupStats, Compaction, CompactionMetrics, + DatasetBasePath, LanceSchema, ScanStatistics, _Dataset, @@ -4897,6 +4898,8 @@ def write_dataset( auto_cleanup_options: Optional[AutoCleanupConfig] = None, commit_message: Optional[str] = None, transaction_properties: Optional[Dict[str, str]] = None, + initial_bases: Optional[List[DatasetBasePath]] = None, + target_bases: Optional[List[str]] = None, ) -> LanceDataset: """Write a given data_obj to the given uri @@ -4975,6 +4978,19 @@ def write_dataset( and can be retrieved using read_transaction(). If both `commit_message` and `properties` are provided, `commit_message` will override any "lance.commit.message" key in `properties`. + initial_bases: list of DatasetBasePath, optional + New base paths to register in the manifest. Only used in **CREATE mode**. + Cannot be specified in APPEND or OVERWRITE modes. + target_bases: list of str, optional + References to base paths where data should be written. Can be + specified in all modes. + + Each string is resolved by trying to match: + 1. Base name (e.g., "primary", "archive") from registered bases + 2. Base path URI (e.g., "s3://bucket1/data") + + **CREATE mode**: References must match bases in `initial_bases` + **APPEND/OVERWRITE modes**: References must match bases in the existing manifest """ if use_legacy_format is not None: warnings.warn( @@ -5016,6 +5032,8 @@ def write_dataset( "enable_stable_row_ids": enable_stable_row_ids, "auto_cleanup_options": auto_cleanup_options, "transaction_properties": merged_properties, + "initial_bases": initial_bases, + "target_bases": target_bases, } if commit_lock: diff --git a/python/python/tests/test_multi_base.py b/python/python/tests/test_multi_base.py new file mode 100644 index 00000000000..b92238f4003 --- /dev/null +++ b/python/python/tests/test_multi_base.py @@ -0,0 +1,488 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright The Lance Authors + +""" +Tests for multi-base dataset functionality. +""" + +import shutil +import tempfile +import uuid +from pathlib import Path + +import lance +import pandas as pd +import pytest +from lance import DatasetBasePath + + +class TestMultiBase: + """Test multi-base dataset functionality with local file system.""" + + def setup_method(self): + """Set up test directories for each test.""" + self.test_dir = tempfile.mkdtemp() + self.test_id = str(uuid.uuid4())[:8] + + # Create primary and additional path directories + self.primary_uri = str(Path(self.test_dir) / "primary") + self.path1_uri = str(Path(self.test_dir) / f"path1_{self.test_id}") + self.path2_uri = str(Path(self.test_dir) / f"path2_{self.test_id}") + self.path3_uri = str(Path(self.test_dir) / f"path3_{self.test_id}") + + # Create directories + for uri in [self.primary_uri, self.path1_uri, self.path2_uri, self.path3_uri]: + Path(uri).mkdir(parents=True, exist_ok=True) + + def teardown_method(self): + """Clean up test directories after each test.""" + if hasattr(self, "test_dir"): + shutil.rmtree(self.test_dir, ignore_errors=True) + + def create_test_data(self, num_rows=500, id_offset=0): + """Create test data for multi-base tests.""" + return pd.DataFrame( + { + "id": range(id_offset, id_offset + num_rows), + "value": [f"value_{i}" for i in range(id_offset, id_offset + num_rows)], + "score": [i * 0.1 for i in range(id_offset, id_offset + num_rows)], + } + ) + + def test_multi_base_create_and_read(self): + """Test creating a multi-base dataset and reading it back.""" + data = self.create_test_data(500) + + # Create dataset with multi-base layout + dataset = lance.write_dataset( + data, + self.primary_uri, + mode="create", + initial_bases=[ + DatasetBasePath(self.path1_uri, name="path1"), + DatasetBasePath(self.path2_uri, name="path2"), + DatasetBasePath(self.path3_uri, name="path3"), + ], + target_bases=["path2"], # Write data to path2 + max_rows_per_file=100, # Force multiple fragments + ) + + assert dataset is not None + assert dataset.uri == self.primary_uri + + # Verify we can read the data back + result = dataset.to_table().to_pandas() + assert len(result) == 500 + + # Verify data integrity + pd.testing.assert_frame_equal( + result.sort_values("id").reset_index(drop=True), + data.sort_values("id").reset_index(drop=True), + ) + + def test_multi_base_append_mode(self): + """Test appending data to a multi-base dataset.""" + # Create initial dataset + initial_data = self.create_test_data(300) + + dataset = lance.write_dataset( + initial_data, + self.primary_uri, + mode="create", + initial_bases=[ + DatasetBasePath(self.path1_uri, name="path1"), + DatasetBasePath(self.path2_uri, name="path2"), + ], + target_bases=["path1"], # Write to path1 + max_rows_per_file=100, + ) + + # Create additional data to append + append_data = self.create_test_data(100, id_offset=300) + + # Append to different path + updated_dataset = lance.write_dataset( + append_data, + dataset, + mode="append", + target_bases=["path2"], # Write to path2 + max_rows_per_file=50, + ) + + # Verify total data + result = updated_dataset.to_table().to_pandas() + assert len(result) == 400 + + # Verify all data is present + expected_ids = set(range(400)) + actual_ids = set(result["id"].tolist()) + assert actual_ids == expected_ids + + def test_multi_base_overwrite_mode_inherits_bases(self): + """Test OVERWRITE mode inherits existing base configuration.""" + # Create initial dataset with multi-base configuration + initial_data = self.create_test_data(200) + + lance.write_dataset( + initial_data, + self.primary_uri, + mode="create", + initial_bases=[ + DatasetBasePath(self.path1_uri, name="path1"), + DatasetBasePath(self.path2_uri, name="path2"), + ], + target_bases=["path1"], # Write to path1 + max_rows_per_file=100, + ) + + # Create new data for overwrite + overwrite_data = self.create_test_data(150, id_offset=100) + + # Overwrite - should inherit existing bases (path1, path2) + # Write to path2 (existing base, referenced by name) + updated_dataset = lance.write_dataset( + overwrite_data, + self.primary_uri, + mode="overwrite", + target_bases=["path2"], # Reference existing base by name + max_rows_per_file=75, + ) + + # Verify overwritten data + result = updated_dataset.to_table().to_pandas() + assert len(result) == 150 + + # Verify data content + expected_ids = set(range(100, 250)) + actual_ids = set(result["id"].tolist()) + assert actual_ids == expected_ids + + # Verify base_paths are preserved from initial dataset + base_paths = updated_dataset._ds.base_paths() + assert len(base_paths) == 2 + assert any(bp.name == "path1" for bp in base_paths.values()) + assert any(bp.name == "path2" for bp in base_paths.values()) + + def test_multi_base_overwrite_mode_primary_path_default(self): + """Test OVERWRITE mode defaults to primary path when no target.""" + # Create initial dataset with explicit data file bases + initial_data = self.create_test_data(100) + + lance.write_dataset( + initial_data, + self.primary_uri, + mode="create", + initial_bases=[ + DatasetBasePath(self.path1_uri, name="path1"), + DatasetBasePath(self.path2_uri, name="path2"), + ], + target_bases=["path1"], # Write to path1 + max_rows_per_file=50, + ) + + # Overwrite without specifying target - should use primary path + # This clears the old base_paths and writes to primary path only + overwrite_data = self.create_test_data(75, id_offset=200) + + updated_dataset = lance.write_dataset( + overwrite_data, + self.primary_uri, + mode="overwrite", + # No target_bases specified - data goes to primary path + # Old bases (path1, path2) are NOT preserved + max_rows_per_file=25, + ) + + # Verify overwritten data + result = updated_dataset.to_table().to_pandas() + assert len(result) == 75 + + # Verify data content + expected_ids = set(range(200, 275)) + actual_ids = set(result["id"].tolist()) + assert actual_ids == expected_ids + + # Verify base_paths are preserved from previous manifest + base_paths = updated_dataset._ds.base_paths() + # Old path1 and path2 ARE preserved in manifest + assert len(base_paths) == 2 + assert any(bp.name == "path1" for bp in base_paths.values()) + assert any(bp.name == "path2" for bp in base_paths.values()) + + # Verify that data files were written to primary path (not to path1 or path2) + fragments = list(updated_dataset.get_fragments()) + for fragment in fragments: + # Fragment files should be in primary path (base_id should be None) + data_file = fragment.data_files()[0] + assert data_file.base_id is None, ( + f"Expected data file to be in primary path (base_id=None), " + f"but got base_id={data_file.base_id}" + ) + + # Since base_paths exist, we can append to them by name + append_data = self.create_test_data(25, id_offset=300) + + final_dataset = lance.write_dataset( + append_data, + updated_dataset, + mode="append", + target_bases=["path2"], # Specify which base to use + ) + + final_result = final_dataset.to_table().to_pandas() + assert len(final_result) == 100 + + def test_multi_base_append_mode_primary_path_default(self): + """Test that APPEND mode defaults to primary path when no target specified.""" + # Create initial dataset with explicit data file bases + initial_data = self.create_test_data(100) + + dataset = lance.write_dataset( + initial_data, + self.primary_uri, + mode="create", + initial_bases=[ + DatasetBasePath(self.path1_uri, name="path1"), + DatasetBasePath(self.path2_uri, name="path2"), + ], + target_bases=["path1"], # Write to path1 + max_rows_per_file=50, + ) + + # Append without specifying target - should use primary path + append_data = self.create_test_data(75, id_offset=100) + + updated_dataset = lance.write_dataset( + append_data, + dataset, + mode="append", + # No target_bases specified - data goes to primary path + max_rows_per_file=25, + ) + + # Verify appended data + result = updated_dataset.to_table().to_pandas() + assert len(result) == 175 + + # Verify data content + expected_ids = set(range(175)) + actual_ids = set(result["id"].tolist()) + assert actual_ids == expected_ids + + # Verify base_paths are preserved from previous manifest + base_paths = updated_dataset._ds.base_paths() + assert len(base_paths) == 2 + assert any(bp.name == "path1" for bp in base_paths.values()) + assert any(bp.name == "path2" for bp in base_paths.values()) + + # Verify new data files were written to primary path + # (not to path1 or path2) + fragments = list(updated_dataset.get_fragments()) + # The first 2 fragments should be in path1 (from initial write) + # The remaining fragments should be in primary path (from append) + primary_path_fragments = 0 + path1_fragments = 0 + + for fragment in fragments: + data_file = fragment.data_files()[0] + if data_file.base_id is None: + primary_path_fragments += 1 + else: + base_path = base_paths.get(data_file.base_id) + if base_path and base_path.name == "path1": + path1_fragments += 1 + + assert path1_fragments == 2, ( + f"Expected 2 fragments in path1, got {path1_fragments}" + ) + assert primary_path_fragments >= 3, ( + f"Expected at least 3 fragments in primary path, " + f"got {primary_path_fragments}" + ) + + def test_multi_base_is_dataset_root_flag(self): + """Test is_dataset_root flag on DatasetBasePath.""" + # Create initial dataset with one base marked as dataset_root + initial_data = self.create_test_data(100) + + dataset = lance.write_dataset( + initial_data, + self.primary_uri, + mode="create", + initial_bases=[ + DatasetBasePath(self.path1_uri, name="path1", is_dataset_root=True), + DatasetBasePath(self.path2_uri, name="path2", is_dataset_root=False), + ], + target_bases=["path1"], + max_rows_per_file=50, + ) + + # Verify base_paths configuration + base_paths = dataset._ds.base_paths() + assert len(base_paths) == 2 + + # Find path1 and path2 in base_paths + path1_base = None + path2_base = None + for base_path in base_paths.values(): + if base_path.name == "path1": + path1_base = base_path + elif base_path.name == "path2": + path2_base = base_path + + assert path1_base is not None, "path1 base not found" + assert path2_base is not None, "path2 base not found" + + # Verify is_dataset_root flags + assert path1_base.is_dataset_root is True, ( + f"Expected path1.is_dataset_root=True, got {path1_base.is_dataset_root}" + ) + assert path2_base.is_dataset_root is False, ( + f"Expected path2.is_dataset_root=False, got {path2_base.is_dataset_root}" + ) + + # Verify data is readable + result = dataset.to_table().to_pandas() + assert len(result) == 100 + + def test_multi_base_target_by_path_uri(self): + """Test using path URIs instead of names in target_bases.""" + # Create initial dataset with named bases + initial_data = self.create_test_data(100) + + dataset = lance.write_dataset( + initial_data, + self.primary_uri, + mode="create", + initial_bases=[ + DatasetBasePath(self.path1_uri, name="path1"), + DatasetBasePath(self.path2_uri, name="path2"), + ], + target_bases=["path1"], # Write to path1 using name + max_rows_per_file=50, + ) + + # Get the base_paths to find the actual path URI for path2 + base_paths = dataset._ds.base_paths() + path2_base = None + for base_path in base_paths.values(): + if base_path.name == "path2": + path2_base = base_path + break + + assert path2_base is not None, "path2 base not found" + + # Append using the path URI instead of name + append_data = self.create_test_data(50, id_offset=100) + + updated_dataset = lance.write_dataset( + append_data, + dataset, + mode="append", + target_bases=[path2_base.path], # Use path URI instead of name + max_rows_per_file=25, + ) + + # Verify appended data + result = updated_dataset.to_table().to_pandas() + assert len(result) == 150 + + # Verify data content + expected_ids = set(range(150)) + actual_ids = set(result["id"].tolist()) + assert actual_ids == expected_ids + + # Verify that new fragments are in path2 (not primary or path1) + fragments = list(updated_dataset.get_fragments()) + base_paths_updated = updated_dataset._ds.base_paths() + + path1_fragments = 0 + path2_fragments = 0 + + for fragment in fragments: + data_file = fragment.data_files()[0] + if data_file.base_id is not None: + base_path = base_paths_updated.get(data_file.base_id) + if base_path and base_path.name == "path1": + path1_fragments += 1 + elif base_path and base_path.name == "path2": + path2_fragments += 1 + + assert path1_fragments == 2, ( + f"Expected 2 fragments in path1, got {path1_fragments}" + ) + assert path2_fragments == 2, ( + f"Expected 2 fragments in path2, got {path2_fragments}" + ) + + def test_validation_errors(self): + """Test validation errors for invalid multi-base configurations.""" + data = self.create_test_data(100) + + # Test 1: Target reference not found in new bases for CREATE mode + with pytest.raises(Exception, match="not found"): + lance.write_dataset( + data, + self.primary_uri, + mode="create", + initial_bases=[ + DatasetBasePath(self.path1_uri, name="path1"), + DatasetBasePath(self.path2_uri, name="path2"), + ], + target_bases=["path3"], # Not in new bases + ) + + # Test 2: DatasetBasePath in APPEND mode + # First create a base dataset + base_dataset = lance.write_dataset( + data, + self.primary_uri, + mode="create", + initial_bases=[ + DatasetBasePath(self.path1_uri, name="path1"), + DatasetBasePath(self.path2_uri, name="path2"), + ], + target_bases=["path1"], + ) + + with pytest.raises(Exception, match="Cannot register new bases in Append mode"): + lance.write_dataset( + data, + base_dataset, + mode="append", + initial_bases=[ + DatasetBasePath( + name="path3", path=self.path3_uri + ), # New base not allowed + ], + target_bases=["path3"], + ) + + def test_fragment_distribution(self): + """Test that fragments are correctly distributed and readable.""" + data = self.create_test_data(1000) + + dataset = lance.write_dataset( + data, + self.primary_uri, + mode="create", + initial_bases=[ + DatasetBasePath(self.path1_uri, name="path1"), + DatasetBasePath(self.path2_uri, name="path2"), + ], + target_bases=["path1"], + max_rows_per_file=100, # Should create 10 fragments + ) + + # Verify fragment count + fragments = list(dataset.get_fragments()) + assert len(fragments) >= 10 # At least 10 fragments + + # Verify all data is readable + result = dataset.to_table().to_pandas() + assert len(result) == 1000 + + # Test scanning with filters + filtered_result = dataset.scanner(filter="id < 500").to_table().to_pandas() + assert len(filtered_result) == 500 + assert all(filtered_result["id"] < 500) diff --git a/python/src/dataset.rs b/python/src/dataset.rs index de693429217..7586d4ae387 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -80,7 +80,7 @@ use lance_index::{ }; use lance_io::object_store::ObjectStoreParams; use lance_linalg::distance::MetricType; -use lance_table::format::Fragment; +use lance_table::format::{BasePath, Fragment}; use lance_table::io::commit::CommitHandler; use crate::error::PythonErrorExt; @@ -379,6 +379,63 @@ impl<'py> IntoPyObject<'py> for PyLance<&ColumnOrdering> { } } +/// Python binding for BasePath +#[pyclass(name = "DatasetBasePath", module = "lance")] +#[derive(Clone)] +pub struct DatasetBasePath { + #[pyo3(get)] + pub id: u32, + #[pyo3(get)] + pub name: Option, + #[pyo3(get)] + pub path: String, + #[pyo3(get)] + pub is_dataset_root: bool, +} + +#[pymethods] +impl DatasetBasePath { + #[new] + #[pyo3(signature = (path, name = None, is_dataset_root = false, id = None))] + fn new(path: String, name: Option, is_dataset_root: bool, id: Option) -> Self { + Self { + id: id.unwrap_or(0), + name, + path, + is_dataset_root, + } + } + + fn __repr__(&self) -> String { + format!( + "DatasetBasePath(id={}, name={:?}, path={}, is_dataset_root={})", + self.id, self.name, self.path, self.is_dataset_root + ) + } +} + +impl From for DatasetBasePath { + fn from(base_path: BasePath) -> Self { + Self { + id: base_path.id, + name: base_path.name, + path: base_path.path, + is_dataset_root: base_path.is_dataset_root, + } + } +} + +impl From for BasePath { + fn from(py_base_path: DatasetBasePath) -> Self { + Self::new( + py_base_path.id, + py_base_path.path.clone(), + py_base_path.name, + py_base_path.is_dataset_root, + ) + } +} + /// Lance Dataset that will be wrapped by another class in Python #[pyclass(name = "_Dataset", module = "_lib")] #[derive(Clone)] @@ -584,6 +641,21 @@ impl Dataset { PyBytes::new(py, &manifest_bytes).into() } + /// Get base paths from the manifest. + /// + /// Returns a dictionary mapping base_id to DatasetBasePath objects. + fn base_paths(&self, py: Python) -> PyResult { + let manifest = self.ds.manifest(); + let dict = pyo3::types::PyDict::new(py); + + for (base_id, base_path) in manifest.base_paths.iter() { + let py_base_path = DatasetBasePath::from(base_path.clone()); + dict.set_item(base_id, py_base_path.into_py_any(py)?)?; + } + + Ok(dict.into()) + } + /// Load index metadata. /// /// This call will open the index and return its concrete index type. @@ -2638,6 +2710,34 @@ pub fn get_write_params(options: &Bound<'_, PyDict>) -> PyResult>>(options, "initial_bases")? + { + let mut new_bases = Vec::new(); + + for item in initial_bases_list.iter() { + if let Ok(dataset_base_path) = item.extract::() { + new_bases.push(BasePath::from(dataset_base_path)); + } else { + return Err(PyValueError::new_err( + "initial_bases must contain DatasetBasePath objects only", + )); + } + } + + if !new_bases.is_empty() { + p = p.with_initial_bases(new_bases); + } + } + + // Handle target_bases parameter (list of strings - base names or paths) + if let Some(target_bases_list) = get_dict_opt::>(options, "target_bases")? { + if !target_bases_list.is_empty() { + p = p.with_target_base_names_or_paths(target_bases_list); + } + } + // Handle properties if let Some(props) = get_dict_opt::>(options, "transaction_properties")? diff --git a/python/src/lib.rs b/python/src/lib.rs index 5365765ca69..419430179c1 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -47,7 +47,7 @@ use dataset::cleanup::CleanupStats; use dataset::optimize::{ PyCompaction, PyCompactionMetrics, PyCompactionPlan, PyCompactionTask, PyRewriteResult, }; -use dataset::{MergeInsertBuilder, PyFullTextQuery}; +use dataset::{DatasetBasePath, MergeInsertBuilder, PyFullTextQuery}; use env_logger::{Builder, Env}; use file::{ stable_version, LanceBufferDescriptor, LanceColumnMetadata, LanceFileMetadata, LanceFileReader, @@ -240,6 +240,7 @@ fn lance(py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; diff --git a/python/src/transaction.rs b/python/src/transaction.rs index b6a8ea4536c..e7bcbccb0aa 100644 --- a/python/src/transaction.rs +++ b/python/src/transaction.rs @@ -1,6 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors +use crate::dataset::DatasetBasePath; use crate::schema::LanceSchema; use crate::utils::{class_name, export_vec, extract_vec, PyLance}; use arrow::pyarrow::PyArrowType; @@ -10,7 +11,7 @@ use lance::dataset::transaction::{ UpdateMapEntry, UpdateMode, }; use lance::datatypes::Schema; -use lance_table::format::{DataFile, Fragment, IndexMetadata}; +use lance_table::format::{BasePath, DataFile, Fragment, IndexMetadata}; use pyo3::exceptions::PyValueError; use pyo3::types::PySet; use pyo3::{intern, prelude::*}; @@ -166,10 +167,23 @@ impl FromPyObject<'_> for PyLance { let fragments = extract_vec(&ob.getattr("fragments")?)?; + let initial_bases = ob + .getattr("initial_bases") + .ok() + .and_then(|attr| attr.extract::>>().ok()) + .flatten() + .map(|py_bases| { + py_bases + .into_iter() + .map(BasePath::from) + .collect::>() + }); + let op = Operation::Overwrite { schema, fragments, config_upsert_values: None, + initial_bases, }; Ok(Self(op)) } diff --git a/rust/lance-io/src/object_store.rs b/rust/lance-io/src/object_store.rs index a07ff3f3b23..8cd1562fe3f 100644 --- a/rust/lance-io/src/object_store.rs +++ b/rust/lance-io/src/object_store.rs @@ -359,6 +359,27 @@ impl ObjectStore { Ok((store, path)) } + /// Extract the path component from a URI without initializing the object store. + /// + /// This is a synchronous operation that only parses the URI and extracts the path, + /// without creating or initializing any object store instance. + /// + /// # Arguments + /// + /// * `registry` - The object store registry to get the provider + /// * `uri` - The URI to extract the path from + /// + /// # Returns + /// + /// The extracted path component + pub fn extract_path_from_uri(registry: Arc, uri: &str) -> Result { + let url = uri_to_url(uri)?; + let provider = registry.get_provider(url.scheme()).ok_or_else(|| { + Error::invalid_input(format!("Unknown scheme: {}", url.scheme()), location!()) + })?; + provider.extract_path(&url) + } + #[deprecated(note = "Use `from_uri` instead")] pub fn from_path(str_path: &str) -> Result<(Arc, Path)> { Self::from_uri_and_params( diff --git a/rust/lance-table/src/feature_flags.rs b/rust/lance-table/src/feature_flags.rs index a74a50a3255..f06e50799a2 100644 --- a/rust/lance-table/src/feature_flags.rs +++ b/rust/lance-table/src/feature_flags.rs @@ -18,8 +18,8 @@ pub const FLAG_STABLE_ROW_IDS: u64 = 2; pub const FLAG_USE_V2_FORMAT_DEPRECATED: u64 = 4; /// Table config is present pub const FLAG_TABLE_CONFIG: u64 = 8; -/// Dataset is a shallow clone with external base paths -pub const FLAG_SHALLOW_CLONE: u64 = 16; +/// Dataset uses multiple base paths (for shallow clones or multi-base datasets) +pub const FLAG_BASE_PATHS: u64 = 16; /// The first bit that is unknown as a feature flag pub const FLAG_UNKNOWN: u64 = 32; @@ -64,10 +64,10 @@ pub fn apply_feature_flags(manifest: &mut Manifest, enable_stable_row_id: bool) manifest.writer_feature_flags |= FLAG_TABLE_CONFIG; } - // Check if this is a shallow clone dataset by examining base_paths + // Check if this dataset uses multiple base paths (for shallow clones or multi-base datasets) if !manifest.base_paths.is_empty() { - manifest.reader_feature_flags |= FLAG_SHALLOW_CLONE; - manifest.writer_feature_flags |= FLAG_SHALLOW_CLONE; + manifest.reader_feature_flags |= FLAG_BASE_PATHS; + manifest.writer_feature_flags |= FLAG_BASE_PATHS; } Ok(()) @@ -97,7 +97,7 @@ mod tests { assert!(can_read_dataset(super::FLAG_STABLE_ROW_IDS)); assert!(can_read_dataset(super::FLAG_USE_V2_FORMAT_DEPRECATED)); assert!(can_read_dataset(super::FLAG_TABLE_CONFIG)); - assert!(can_read_dataset(super::FLAG_SHALLOW_CLONE)); + assert!(can_read_dataset(super::FLAG_BASE_PATHS)); assert!(can_read_dataset( super::FLAG_DELETION_FILES | super::FLAG_STABLE_ROW_IDS @@ -113,19 +113,19 @@ mod tests { assert!(can_write_dataset(super::FLAG_STABLE_ROW_IDS)); assert!(can_write_dataset(super::FLAG_USE_V2_FORMAT_DEPRECATED)); assert!(can_write_dataset(super::FLAG_TABLE_CONFIG)); - assert!(can_write_dataset(super::FLAG_SHALLOW_CLONE)); + assert!(can_write_dataset(super::FLAG_BASE_PATHS)); assert!(can_write_dataset( super::FLAG_DELETION_FILES | super::FLAG_STABLE_ROW_IDS | super::FLAG_USE_V2_FORMAT_DEPRECATED | super::FLAG_TABLE_CONFIG - | super::FLAG_SHALLOW_CLONE + | super::FLAG_BASE_PATHS )); assert!(!can_write_dataset(super::FLAG_UNKNOWN)); } #[test] - fn test_shallow_clone_feature_flags() { + fn test_base_paths_feature_flags() { use crate::format::{DataStorageFormat, Manifest}; use arrow_schema::{Field as ArrowField, Schema as ArrowSchema}; use lance_core::datatypes::Schema; @@ -138,7 +138,7 @@ mod tests { false, )]); let schema = Schema::try_from(&arrow_schema).unwrap(); - // Test 1: Normal dataset (no base_paths) should not have FLAG_SHALLOW_CLONE + // Test 1: Normal dataset (no base_paths) should not have FLAG_BASE_PATHS let mut normal_manifest = Manifest::new( schema.clone(), Arc::new(vec![]), @@ -147,28 +147,34 @@ mod tests { HashMap::new(), // Empty base_paths ); apply_feature_flags(&mut normal_manifest, false).unwrap(); - assert_eq!(normal_manifest.reader_feature_flags & FLAG_SHALLOW_CLONE, 0); - assert_eq!(normal_manifest.writer_feature_flags & FLAG_SHALLOW_CLONE, 0); - // Test 2: Cloned dataset (with base_paths) should have FLAG_SHALLOW_CLONE + assert_eq!(normal_manifest.reader_feature_flags & FLAG_BASE_PATHS, 0); + assert_eq!(normal_manifest.writer_feature_flags & FLAG_BASE_PATHS, 0); + // Test 2: Dataset with base_paths (shallow clone or multi-base) should have FLAG_BASE_PATHS let mut base_paths: HashMap = HashMap::new(); base_paths.insert( 1, - BasePath { - id: 1, - name: Some("test_ref".to_string()), - is_dataset_root: true, - path: "/path/to/original".to_string(), - }, + BasePath::new( + 1, + "file:///path/to/original".to_string(), + Some("test_ref".to_string()), + true, + ), ); - let mut cloned_manifest = Manifest::new( + let mut multi_base_manifest = Manifest::new( schema, Arc::new(vec![]), DataStorageFormat::default(), None, base_paths, ); - apply_feature_flags(&mut cloned_manifest, false).unwrap(); - assert_ne!(cloned_manifest.reader_feature_flags & FLAG_SHALLOW_CLONE, 0); - assert_ne!(cloned_manifest.writer_feature_flags & FLAG_SHALLOW_CLONE, 0); + apply_feature_flags(&mut multi_base_manifest, false).unwrap(); + assert_ne!( + multi_base_manifest.reader_feature_flags & FLAG_BASE_PATHS, + 0 + ); + assert_ne!( + multi_base_manifest.writer_feature_flags & FLAG_BASE_PATHS, + 0 + ); } } diff --git a/rust/lance-table/src/format/fragment.rs b/rust/lance-table/src/format/fragment.rs index a581bcbf433..4d3ab9cf877 100644 --- a/rust/lance-table/src/format/fragment.rs +++ b/rust/lance-table/src/format/fragment.rs @@ -54,6 +54,7 @@ impl DataFile { file_major_version: u32, file_minor_version: u32, file_size_bytes: Option>, + base_id: Option, ) -> Self { Self { path: path.into(), @@ -62,7 +63,7 @@ impl DataFile { file_major_version, file_minor_version, file_size_bytes: file_size_bytes.into(), - base_id: None, + base_id, } } @@ -83,7 +84,11 @@ impl DataFile { } } - pub fn new_legacy_from_fields(path: impl Into, fields: Vec) -> Self { + pub fn new_legacy_from_fields( + path: impl Into, + fields: Vec, + base_id: Option, + ) -> Self { Self::new( path, fields, @@ -91,6 +96,7 @@ impl DataFile { MAJOR_VERSION as u32, MINOR_VERSION as u32, None, + base_id, ) } @@ -98,6 +104,7 @@ impl DataFile { path: impl Into, schema: &Schema, file_size_bytes: Option>, + base_id: Option, ) -> Self { let mut field_ids = schema.field_ids(); field_ids.sort(); @@ -108,6 +115,7 @@ impl DataFile { MAJOR_VERSION as u32, MINOR_VERSION as u32, file_size_bytes, + base_id, ) } @@ -324,7 +332,7 @@ impl Fragment { ) -> Self { Self { id, - files: vec![DataFile::new_legacy(path, schema, None)], + files: vec![DataFile::new_legacy(path, schema, None, None)], deletion_file: None, physical_rows, row_id_meta: None, @@ -347,6 +355,7 @@ impl Fragment { major, minor, file_size_bytes, + None, ); self.files.push(data_file); self @@ -373,12 +382,14 @@ impl Fragment { major, minor, file_size_bytes, + None, )); } /// Add a new [`DataFile`] to this fragment. pub fn add_file_legacy(&mut self, path: &str, schema: &Schema) { - self.files.push(DataFile::new_legacy(path, schema, None)); + self.files + .push(DataFile::new_legacy(path, schema, None, None)); } // True if this fragment is made up of legacy v1 files, false otherwise @@ -519,6 +530,7 @@ mod tests { vec![DataFile::new_legacy_from_fields( path.to_string(), vec![0, 1, 2, 3], + None, )] ) } diff --git a/rust/lance-table/src/format/manifest.rs b/rust/lance-table/src/format/manifest.rs index a86bb460692..2679a906c74 100644 --- a/rust/lance-table/src/format/manifest.rs +++ b/rust/lance-table/src/format/manifest.rs @@ -21,7 +21,7 @@ use crate::format::pb; use lance_core::cache::LanceCache; use lance_core::datatypes::{Schema, StorageClass}; use lance_core::{Error, Result}; -use lance_io::object_store::ObjectStore; +use lance_io::object_store::{ObjectStore, ObjectStoreRegistry}; use lance_io::utils::read_struct; use snafu::location; @@ -296,12 +296,7 @@ impl Manifest { blob_dataset_version: self.blob_dataset_version, base_paths: { let mut base_paths = self.base_paths.clone(); - let base_path = BasePath { - id: ref_base_id, - name: ref_name, - is_dataset_root: true, - path: ref_path, - }; + let base_path = BasePath::new(ref_base_id, ref_path, ref_name, true); base_paths.insert(ref_base_id, base_path); base_paths }, @@ -575,14 +570,49 @@ impl Manifest { } } -#[derive(Debug, Clone, PartialEq, DeepSizeOf)] +#[derive(Debug, Clone, PartialEq)] pub struct BasePath { pub id: u32, pub name: Option, pub is_dataset_root: bool, + /// The full URI string (e.g., "s3://bucket/path") pub path: String, } +impl BasePath { + /// Create a new BasePath + /// + /// # Arguments + /// + /// * `id` - Unique identifier for this base path + /// * `path` - Full URI string (e.g., "s3://bucket/path", "/local/path") + /// * `name` - Optional human-readable name for this base + /// * `is_dataset_root` - Whether this is the dataset root or a data-only base + pub fn new(id: u32, path: String, name: Option, is_dataset_root: bool) -> Self { + Self { + id, + name, + is_dataset_root, + path, + } + } + + /// Extract the object store path from this BasePath's URI. + /// + /// This is a synchronous operation that parses the URI without initializing an object store. + pub fn extract_path(&self, registry: Arc) -> Result { + ObjectStore::extract_path_from_uri(registry, &self.path) + } +} + +impl DeepSizeOf for BasePath { + fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize { + self.name.deep_size_of_children(context) + + self.path.deep_size_of_children(context) * 2 + + size_of::() + } +} + #[derive(Debug, Clone, PartialEq, DeepSizeOf)] pub struct WriterVersion { pub library: String, @@ -701,6 +731,12 @@ impl ProtoStruct for Manifest { impl From for BasePath { fn from(p: pb::BasePath) -> Self { + Self::new(p.id, p.path, p.name, p.is_dataset_root) + } +} + +impl From for pb::BasePath { + fn from(p: BasePath) -> Self { Self { id: p.id, name: p.name, @@ -1029,7 +1065,11 @@ mod tests { let fragments = vec![ Fragment { id: 0, - files: vec![DataFile::new_legacy_from_fields("path1", vec![0, 1, 2])], + files: vec![DataFile::new_legacy_from_fields( + "path1", + vec![0, 1, 2], + None, + )], deletion_file: None, row_id_meta: None, physical_rows: None, @@ -1037,8 +1077,8 @@ mod tests { Fragment { id: 1, files: vec![ - DataFile::new_legacy_from_fields("path2", vec![0, 1, 43]), - DataFile::new_legacy_from_fields("path3", vec![2]), + DataFile::new_legacy_from_fields("path2", vec![0, 1, 43], None), + DataFile::new_legacy_from_fields("path3", vec![2], None), ], deletion_file: None, row_id_meta: None, diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index ea5d61aefc7..e14cc6c4be1 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -117,6 +117,7 @@ pub use write::merge_insert::{ MergeInsertBuilder, MergeInsertJob, MergeStats, UncommittedMergeInsert, WhenMatched, WhenNotMatched, WhenNotMatchedBySource, }; + pub use write::update::{UpdateBuilder, UpdateJob}; #[allow(deprecated)] pub use write::{ @@ -163,6 +164,10 @@ pub struct Dataset { /// File reader options to use when reading data files. pub(crate) file_reader_options: Option, + + /// Object store parameters used when opening this dataset. + /// These are used when creating object stores for additional base paths. + pub(crate) store_params: Option>, } impl std::fmt::Debug for Dataset { @@ -552,6 +557,7 @@ impl Dataset { self.session.clone(), self.commit_handler.clone(), self.file_reader_options.clone(), + self.store_params.as_deref().cloned(), ) } @@ -662,6 +668,7 @@ impl Dataset { session: Arc, commit_handler: Arc, file_reader_options: Option, + store_params: Option, ) -> Result { let refs = Refs::new( object_store.clone(), @@ -688,6 +695,7 @@ impl Dataset { metadata_cache, index_cache, file_reader_options, + store_params: store_params.map(Box::new), }) } @@ -830,6 +838,7 @@ impl Dataset { self.session.clone(), self.commit_handler.clone(), self.file_reader_options.clone(), + self.store_params.as_deref().cloned(), )?; Ok(Some(Arc::new(blobs_dataset))) } else { @@ -1365,8 +1374,7 @@ impl Dataset { location!(), ) })?; - - let path = Path::parse(base_path.path.as_str())?; + let path = base_path.extract_path(self.session.store_registry())?; if base_path.is_dataset_root { Ok(path.child(DATA_DIR)) } else { @@ -1377,6 +1385,25 @@ impl Dataset { } } + /// Get the ObjectStore for a specific path based on base_id + pub(crate) async fn object_store_for_base(&self, base_id: u32) -> Result> { + let base_path = self.manifest.base_paths.get(&base_id).ok_or_else(|| { + Error::invalid_input( + format!("Dataset base path with ID {} not found", base_id), + Default::default(), + ) + })?; + + let (store, _) = ObjectStore::from_uri_and_params( + self.session.store_registry(), + &base_path.path, + &self.store_params.as_deref().cloned().unwrap_or_default(), + ) + .await?; + + Ok(store) + } + pub(crate) fn dataset_dir_for_deletion(&self, deletion_file: &DeletionFile) -> Result { match deletion_file.base_id.as_ref() { Some(base_id) => { @@ -1400,7 +1427,7 @@ impl Dataset { location: location!(), }); } - Ok(Path::parse(base_path.path.as_str())?) + base_path.extract_path(self.session.store_registry()) } None => Ok(self.base.clone()), } @@ -1420,7 +1447,7 @@ impl Dataset { location!(), ) })?; - let path = Path::parse(base_path.path.as_str())?; + let path = base_path.extract_path(self.session.store_registry())?; if base_path.is_dataset_root { Ok(path.child(INDICES_DIR)) } else { @@ -1854,7 +1881,7 @@ impl Dataset { is_shallow: true, ref_name, ref_version: version_number, - ref_path: String::from(self.base.clone()), + ref_path: self.uri.clone(), branch_name: None, }; let transaction = Transaction::new(version_number, clone_op, None, None); @@ -1997,6 +2024,7 @@ pub(crate) fn load_new_transactions(dataset: &Dataset) -> NewTransactionResult<' dataset.session(), dataset.commit_handler.clone(), dataset.file_reader_options.clone(), + dataset.store_params.as_deref().cloned(), )?; let object_store = dataset_version.object_store(); let path = dataset_version @@ -2034,6 +2062,7 @@ pub(crate) fn load_new_transactions(dataset: &Dataset) -> NewTransactionResult<' dataset.session(), dataset.commit_handler.clone(), dataset.file_reader_options.clone(), + dataset.store_params.as_deref().cloned(), ) } else { // If we didn't get the latest manifest, we can still return the dataset @@ -4172,6 +4201,7 @@ mod tests { fragments: vec![], schema, config_upsert_values: None, + initial_bases: None, }; let test_dir = TempStdDir::default(); let test_uri = test_dir.to_str().unwrap(); @@ -4204,6 +4234,7 @@ mod tests { fragments: vec![], schema, config_upsert_values: None, + initial_bases: None, }; let test_uri = TempStrDir::default(); let read_version_0_transaction = Transaction::new(0, operation, None, None); diff --git a/rust/lance/src/dataset/builder.rs b/rust/lance/src/dataset/builder.rs index 0a756e26f95..fb1050962f6 100644 --- a/rust/lance/src/dataset/builder.rs +++ b/rust/lance/src/dataset/builder.rs @@ -316,6 +316,7 @@ impl DatasetBuilder { let manifest = self.manifest.take(); let file_reader_options = self.file_reader_options.clone(); + let store_params = self.options.clone(); let (object_store, base_path, commit_handler) = self.build_object_store().await?; // Two cases that need to check out after loading the manifest: @@ -364,6 +365,7 @@ impl DatasetBuilder { object_store, base_path, commit_handler, + Some(store_params), ) .await?; @@ -398,6 +400,7 @@ impl DatasetBuilder { object_store: Arc, base_path: Path, commit_handler: Arc, + store_params: Option, ) -> Result { let (manifest, location) = if let Some(mut manifest) = manifest { let location = commit_handler @@ -458,6 +461,7 @@ impl DatasetBuilder { session, commit_handler, file_reader_options, + store_params, ) } } diff --git a/rust/lance/src/dataset/fragment.rs b/rust/lance/src/dataset/fragment.rs index 66d93e22071..6ad17fa9e81 100644 --- a/rust/lance/src/dataset/fragment.rs +++ b/rust/lance/src/dataset/fragment.rs @@ -935,21 +935,29 @@ impl FileFragment { .dataset .data_file_dir(data_file)? .child(data_file.path.as_str()); - let (store_scheduler, reader_priority) = - if let Some(scan_scheduler) = read_config.scan_scheduler.as_ref() { - ( - scan_scheduler.clone(), - read_config.reader_priority.unwrap_or(0), - ) - } else { - ( - ScanScheduler::new( - self.dataset.object_store.clone(), - SchedulerConfig::max_bandwidth(&self.dataset.object_store), - ), - 0, - ) - }; + let (store_scheduler, reader_priority) = if let Some(base_id) = data_file.base_id { + // TODO: make object stores for non-default bases reuse the same scan scheduler + // currently we always create a new one + let object_store = self.dataset.object_store_for_base(base_id).await?; + let config = SchedulerConfig::max_bandwidth(&object_store); + ( + ScanScheduler::new(object_store, config), + read_config.reader_priority.unwrap_or(0), + ) + } else if let Some(scan_scheduler) = read_config.scan_scheduler.as_ref() { + ( + scan_scheduler.clone(), + read_config.reader_priority.unwrap_or(0), + ) + } else { + ( + ScanScheduler::new( + self.dataset.object_store.clone(), + SchedulerConfig::max_bandwidth(&self.dataset.object_store), + ), + 0, + ) + }; let file_scheduler = store_scheduler .open_file_with_priority(&path, reader_priority as u64, &data_file.file_size_bytes) .await?; @@ -3168,6 +3176,7 @@ mod tests { schema: schema.clone(), fragments, config_upsert_values: None, + initial_bases: None, }; let new_dataset = @@ -3277,6 +3286,7 @@ mod tests { fragments: vec![new_fragment], schema: full_schema.clone(), config_upsert_values: None, + initial_bases: None, }; let dataset = diff --git a/rust/lance/src/dataset/fragment/write.rs b/rust/lance/src/dataset/fragment/write.rs index 3186dedc7b3..5cb2eda363b 100644 --- a/rust/lance/src/dataset/fragment/write.rs +++ b/rust/lance/src/dataset/fragment/write.rs @@ -210,6 +210,7 @@ impl<'a> FragmentCreateBuilder<'a> { stream, params.into_owned(), LanceFileVersion::Stable, + None, // Fragment creation doesn't use target_bases ) .await } diff --git a/rust/lance/src/dataset/optimize.rs b/rust/lance/src/dataset/optimize.rs index 8aa359bfeb1..b780ff80552 100644 --- a/rust/lance/src/dataset/optimize.rs +++ b/rust/lance/src/dataset/optimize.rs @@ -734,6 +734,7 @@ async fn rewrite_files( dataset.schema().clone(), reader, params, + None, // Compaction doesn't use target_bases ) .await?; diff --git a/rust/lance/src/dataset/transaction.rs b/rust/lance/src/dataset/transaction.rs index 2bb689908cf..20738ab88c0 100644 --- a/rust/lance/src/dataset/transaction.rs +++ b/rust/lance/src/dataset/transaction.rs @@ -58,7 +58,9 @@ use lance_io::object_store::ObjectStore; use lance_table::feature_flags::{apply_feature_flags, FLAG_STABLE_ROW_IDS}; use lance_table::rowids::read_row_ids; use lance_table::{ - format::{pb, DataFile, DataStorageFormat, Fragment, IndexMetadata, Manifest, RowIdMeta}, + format::{ + pb, BasePath, DataFile, DataStorageFormat, Fragment, IndexMetadata, Manifest, RowIdMeta, + }, io::{ commit::CommitHandler, manifest::{read_manifest, read_manifest_indexes}, @@ -171,6 +173,7 @@ pub enum Operation { fragments: Vec, schema: Schema, config_upsert_values: Option>, + initial_bases: Option>, }, /// A new index has been created. CreateIndex { @@ -378,16 +381,19 @@ impl PartialEq for Operation { fragments: a_fragments, schema: a_schema, config_upsert_values: a_config, + initial_bases: a_initial, }, Self::Overwrite { fragments: b_fragments, schema: b_schema, config_upsert_values: b_config, + initial_bases: b_initial, }, ) => { compare_vec(a_fragments, b_fragments) && a_schema == b_schema && a_config == b_config + && a_initial == b_initial } ( Self::CreateIndex { @@ -1479,11 +1485,42 @@ impl Transaction { location: location!(), }); } - let reference_paths = match current_manifest { + let mut reference_paths = match current_manifest { Some(m) => m.base_paths.clone(), None => HashMap::new(), }; + if let Operation::Overwrite { + initial_bases: Some(initial_bases), + .. + } = &self.operation + { + if current_manifest.is_none() { + // CREATE mode: registering base paths + // Base IDs should have been assigned during write operation + // Validate uniqueness and insert them into the manifest + for base_path in initial_bases.iter() { + if reference_paths.contains_key(&base_path.id) { + return Err(Error::invalid_input( + format!( + "Duplicate base path ID {} detected. Base path IDs must be unique.", + base_path.id + ), + location!(), + )); + } + reference_paths.insert(base_path.id, base_path.clone()); + } + } else { + // OVERWRITE mode with initial_bases should have been rejected by validation + // This branch should never be reached + return Err(Error::invalid_input( + "OVERWRITE mode cannot register new bases. This should have been caught by validation.", + location!(), + )); + } + } + // Get the schema and the final fragment list let schema = match self.operation { Operation::Overwrite { ref schema, .. } => schema.clone(), @@ -1874,6 +1911,8 @@ impl Transaction { }; let mut manifest = if let Some(current_manifest) = current_manifest { + // OVERWRITE with initial_bases on existing dataset is not allowed (caught by validation) + // So we always use new_from_previous which preserves base_paths let mut prev_manifest = Manifest::new_from_previous( current_manifest, schema, @@ -2448,6 +2487,7 @@ impl TryFrom for Transaction { schema, schema_metadata: _schema_metadata, // TODO: handle metadata config_upsert_values, + initial_bases, })) => { let config_upsert_option = if config_upsert_values.is_empty() { Some(config_upsert_values) @@ -2462,6 +2502,11 @@ impl TryFrom for Transaction { .collect::>>()?, schema: Schema::from(&Fields(schema)), config_upsert_values: config_upsert_option, + initial_bases: if initial_bases.is_empty() { + None + } else { + Some(initial_bases.into_iter().map(BasePath::from).collect()) + }, } } Some(pb::transaction::Operation::ReserveFragments( @@ -2692,6 +2737,7 @@ impl TryFrom for Transaction { schema, schema_metadata: _schema_metadata, // TODO: handle metadata config_upsert_values, + initial_bases, }) => { let config_upsert_option = if config_upsert_values.is_empty() { Some(config_upsert_values) @@ -2706,6 +2752,11 @@ impl TryFrom for Transaction { .collect::>>()?, schema: Schema::from(&Fields(schema)), config_upsert_values: config_upsert_option, + initial_bases: if initial_bases.is_empty() { + None + } else { + Some(initial_bases.into_iter().map(BasePath::from).collect()) + }, }) } }) @@ -2825,6 +2876,7 @@ impl From<&Transaction> for pb::Transaction { fragments, schema, config_upsert_values, + initial_bases, } => { pb::transaction::Operation::Overwrite(pb::transaction::Overwrite { fragments: fragments.iter().map(pb::DataFragment::from).collect(), @@ -2833,6 +2885,16 @@ impl From<&Transaction> for pb::Transaction { config_upsert_values: config_upsert_values .clone() .unwrap_or(Default::default()), + initial_bases: initial_bases + .as_ref() + .map(|paths| { + paths + .iter() + .cloned() + .map(|bp: BasePath| -> pb::BasePath { bp.into() }) + .collect::>() + }) + .unwrap_or_default(), }) } Operation::ReserveFragments { num_fragments } => { @@ -2973,6 +3035,7 @@ impl From<&Transaction> for pb::Transaction { fragments, schema, config_upsert_values, + initial_bases, } => { pb::transaction::BlobOperation::BlobOverwrite(pb::transaction::Overwrite { fragments: fragments.iter().map(pb::DataFragment::from).collect(), @@ -2981,6 +3044,16 @@ impl From<&Transaction> for pb::Transaction { config_upsert_values: config_upsert_values .clone() .unwrap_or(Default::default()), + initial_bases: initial_bases + .as_ref() + .map(|paths| { + paths + .iter() + .cloned() + .map(|bp: BasePath| -> pb::BasePath { bp.into() }) + .collect::>() + }) + .unwrap_or_default(), }) } _ => panic!("Invalid blob operation: {:?}", value), @@ -3073,6 +3146,7 @@ pub fn validate_operation(manifest: Option<&Manifest>, operation: &Operation) -> fragments, schema, config_upsert_values: None, + initial_bases: _, } => schema_fragments_valid(Some(manifest), schema, fragments), Operation::Update { updated_fragments, diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index 0c351d60ac1..e4250e1e106 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -1,10 +1,6 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors -use std::collections::HashMap; -use std::num::NonZero; -use std::sync::Arc; - use arrow_array::RecordBatch; use chrono::TimeDelta; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; @@ -25,11 +21,15 @@ use lance_file::v2::writer::FileWriterOptions; use lance_file::version::LanceFileVersion; use lance_file::writer::{FileWriter, ManifestProvider}; use lance_io::object_store::{ObjectStore, ObjectStoreParams, ObjectStoreRegistry}; -use lance_table::format::{DataFile, Fragment}; +use lance_table::format::{BasePath, DataFile, Fragment}; use lance_table::io::commit::{commit_handler_from_url, CommitHandler}; use lance_table::io::manifest::ManifestDescribing; use object_store::path::Path; use snafu::location; +use std::collections::HashMap; +use std::num::NonZero; +use std::sync::atomic::AtomicUsize; +use std::sync::Arc; use tracing::{info, instrument}; use crate::session::Session; @@ -226,6 +226,25 @@ pub struct WriteParams { /// This can include commit messages, engine information, etc. /// this properties map will be persisted as part of Transaction object. pub transaction_properties: Option>>, + + /// New base paths to register in the manifest during dataset creation. + /// Each BasePath must have a properly assigned ID (non-zero). + /// Only used in CREATE/OVERWRITE modes for manifest registration. + /// IDs should be assigned by the caller before passing to WriteParams. + pub initial_bases: Option>, + + /// Target base IDs for writing data files. + /// When provided, all new data files will be written to bases with these IDs. + /// Used in all modes (CREATE, APPEND, OVERWRITE) to specify where data should be written. + /// The IDs must correspond to either: + /// - IDs in initial_bases (for CREATE/OVERWRITE modes) + /// - IDs already registered in the existing dataset manifest (for APPEND mode) + pub target_bases: Option>, + + /// Target base names or paths as strings (unresolved). + /// These will be resolved to IDs when the write operation executes. + /// Resolution happens at builder execution time when dataset context is available. + pub target_base_names_or_paths: Option>, } impl Default for WriteParams { @@ -247,6 +266,9 @@ impl Default for WriteParams { auto_cleanup: Some(AutoCleanupParams::default()), skip_auto_cleanup: false, transaction_properties: None, + initial_bases: None, + target_bases: None, + target_base_names_or_paths: None, } } } @@ -279,6 +301,52 @@ impl WriteParams { ..self } } + + /// Set the initial_bases for this WriteParams. + /// + /// This specifies new base paths to register in the manifest during dataset creation. + /// Each BasePath must have a properly assigned ID (non-zero) before calling this method. + /// Only used in CREATE/OVERWRITE modes for manifest registration. + pub fn with_initial_bases(self, bases: Vec) -> Self { + Self { + initial_bases: Some(bases), + ..self + } + } + + /// Set the target_bases for this WriteParams. + /// + /// This specifies the base IDs where data files should be written. + /// The IDs must correspond to either: + /// - IDs in initial_bases (for CREATE/OVERWRITE modes) + /// - IDs already registered in the existing dataset manifest (for APPEND mode) + pub fn with_target_bases(self, base_ids: Vec) -> Self { + Self { + target_bases: Some(base_ids), + ..self + } + } + + /// Store target base names or paths for deferred resolution. + /// + /// This method stores the references in `target_base_names_or_paths` field + /// to be resolved later at execution time when the dataset manifest is available. + /// + /// Resolution will happen at write execution time and will try to match: + /// 1. initial_bases by name + /// 2. initial_bases by path + /// 3. existing manifest by name + /// 4. existing manifest by path + /// + /// # Arguments + /// + /// * `references` - Vector of base names or paths to be resolved later + pub fn with_target_base_names_or_paths(self, references: Vec) -> Self { + Self { + target_base_names_or_paths: Some(references), + ..self + } + } } /// Writes the given data to the dataset and returns fragments. @@ -308,6 +376,7 @@ pub async fn do_write_fragments( data: SendableRecordBatchStream, params: WriteParams, storage_version: LanceFileVersion, + target_bases_info: Option>, ) -> Result> { // Convert arrow.json to lance.json (JSONB) for storage if needed // @@ -325,7 +394,13 @@ pub async fn do_write_fragments( .boxed() }; - let writer_generator = WriterGenerator::new(object_store, base_dir, schema, storage_version); + let writer_generator = WriterGenerator::new( + object_store, + base_dir, + schema, + storage_version, + target_bases_info, + ); let mut writer: Option> = None; let mut num_rows_in_current_file = 0; let mut fragments = Vec::new(); @@ -377,6 +452,111 @@ pub struct WrittenFragments { pub blob: Option<(Vec, Schema)>, } +pub async fn validate_and_resolve_target_bases( + params: &mut WriteParams, + existing_base_paths: Option<&HashMap>, +) -> Result>> { + // Step 1: Validations + if !matches!(params.mode, WriteMode::Create) && params.initial_bases.is_some() { + return Err(Error::invalid_input( + format!( + "Cannot register new bases in {:?} mode. Only CREATE mode can register new bases.", + params.mode + ), + location!(), + )); + } + + if params.target_base_names_or_paths.is_some() && params.target_bases.is_some() { + return Err(Error::invalid_input( + "Cannot specify both target_base_names_or_paths and target_bases. Use one or the other.", + location!(), + )); + } + + // Step 2: Assign IDs to initial_bases and add them to all_bases + let mut all_bases: HashMap = existing_base_paths.cloned().unwrap_or_default(); + if let Some(initial_bases) = &mut params.initial_bases { + let mut next_id = all_bases.keys().max().map(|&id| id + 1).unwrap_or(1); + + for base_path in initial_bases.iter_mut() { + if base_path.id == 0 { + base_path.id = next_id; + next_id += 1; + } + all_bases.insert(base_path.id, base_path.clone()); + } + } + + // Step 3: Resolve target_base_names_or_paths to IDs + let target_base_ids = if let Some(ref names_or_paths) = params.target_base_names_or_paths { + let mut resolved_ids = Vec::new(); + for reference in names_or_paths { + let ref_str = reference.as_str(); + let id = all_bases + .iter() + .find(|(_, base)| { + base.name.as_ref().map(|n| n == ref_str).unwrap_or(false) + || base.path == ref_str + }) + .map(|(&id, _)| id) + .ok_or_else(|| { + Error::invalid_input( + format!("Base reference '{}' not found in available bases", ref_str), + location!(), + ) + })?; + + resolved_ids.push(id); + } + Some(resolved_ids) + } else { + params.target_bases.clone() + }; + + // Step 4: Prepare TargetBaseInfo structs + let store_registry = params + .session + .as_ref() + .map(|s| s.store_registry()) + .unwrap_or_default(); + + if let Some(target_bases) = &target_base_ids { + let store_params = params.store_params.clone().unwrap_or_default(); + let mut bases_info = Vec::new(); + + for &target_base_id in target_bases { + let base_path = all_bases.get(&target_base_id).ok_or_else(|| { + Error::invalid_input( + format!( + "Target base ID {} not found in available bases", + target_base_id + ), + location!(), + ) + })?; + + let (target_object_store, extracted_path) = ObjectStore::from_uri_and_params( + store_registry.clone(), + &base_path.path, + &store_params, + ) + .await?; + + bases_info.push(TargetBaseInfo { + base_id: target_base_id, + object_store: target_object_store, + base_dir: extracted_path, + is_dataset_root: base_path.is_dataset_root, + }); + } + + Ok(Some(bases_info)) + } else { + Ok(None) + } +} + /// Writes the given data to the dataset and returns fragments. /// /// NOTE: the fragments have not yet been assigned an ID. That must be done @@ -394,6 +574,7 @@ pub async fn write_fragments_internal( schema: Schema, data: SendableRecordBatchStream, mut params: WriteParams, + target_bases_info: Option>, ) -> Result { // Convert Arrow JSON columns to Lance JSON (JSONB) format // @@ -502,6 +683,7 @@ pub async fn write_fragments_internal( data, params, storage_version, + target_bases_info, ); let (default, blob) = if let Some(blob_data) = blob_data { @@ -514,6 +696,7 @@ pub async fn write_fragments_internal( blob_data, blob_write_params, storage_version, + None, // Blobs don't use target_bases ); let (fragments_res, blobs_res) = futures::join!(fragments_fut, blob_fut); let fragments = fragments_res?; @@ -540,22 +723,35 @@ pub trait GenericWriter: Send { async fn finish(&mut self) -> Result<(u32, DataFile)>; } +struct V1WriterAdapter +where + M: ManifestProvider + Send + Sync, +{ + writer: FileWriter, + path: String, + base_id: Option, +} + #[async_trait::async_trait] -impl GenericWriter for (FileWriter, String) { +impl GenericWriter for V1WriterAdapter +where + M: ManifestProvider + Send + Sync, +{ async fn write(&mut self, batches: &[RecordBatch]) -> Result<()> { - self.0.write(batches).await + self.writer.write(batches).await } async fn tell(&mut self) -> Result { - Ok(self.0.tell().await? as u64) + Ok(self.writer.tell().await? as u64) } async fn finish(&mut self) -> Result<(u32, DataFile)> { - let size_bytes = self.0.tell().await?; + let size_bytes = self.writer.tell().await?; Ok(( - self.0.finish().await? as u32, + self.writer.finish().await? as u32, DataFile::new_legacy( - self.1.clone(), - self.0.schema(), + self.path.clone(), + self.writer.schema(), NonZero::new(size_bytes as u64), + self.base_id, ), )) } @@ -564,6 +760,7 @@ impl GenericWriter for (FileWriter, String struct V2WriterAdapter { writer: v2::writer::FileWriter, path: String, + base_id: Option, } #[async_trait::async_trait] @@ -599,6 +796,7 @@ impl GenericWriter for V2WriterAdapter { major, minor, NonZero::new(self.writer.tell().await?), + self.base_id, ); Ok((num_rows, data_file)) } @@ -609,22 +807,38 @@ pub async fn open_writer( schema: &Schema, base_dir: &Path, storage_version: LanceFileVersion, +) -> Result> { + open_writer_with_options(object_store, schema, base_dir, storage_version, true, None).await +} + +pub async fn open_writer_with_options( + object_store: &ObjectStore, + schema: &Schema, + base_dir: &Path, + storage_version: LanceFileVersion, + add_data_dir: bool, + base_id: Option, ) -> Result> { let filename = format!("{}.lance", generate_random_filename()); - let full_path = base_dir.child(DATA_DIR).child(filename.as_str()); + let full_path = if add_data_dir { + base_dir.child(DATA_DIR).child(filename.as_str()) + } else { + base_dir.child(filename.as_str()) + }; let writer = if storage_version == LanceFileVersion::Legacy { - Box::new(( - FileWriter::::try_new( + Box::new(V1WriterAdapter { + writer: FileWriter::::try_new( object_store, &full_path, schema.clone(), &Default::default(), ) .await?, - filename, - )) + path: filename, + base_id, + }) } else { let writer = object_store.create(&full_path).await?; let file_writer = v2::writer::FileWriter::try_new( @@ -638,18 +852,37 @@ pub async fn open_writer( let writer_adapter = V2WriterAdapter { writer: file_writer, path: filename, + base_id, }; Box::new(writer_adapter) as Box }; Ok(writer) } -/// Creates new file writers for a given dataset. +/// Information about a target base for writing. +/// Contains the base ID, object store, directory path, and whether it's a dataset root. +pub struct TargetBaseInfo { + pub base_id: u32, + pub object_store: Arc, + /// The base directory path (without /data subdirectory) + pub base_dir: Path, + /// Whether this base path is a dataset root. + /// If true, /data will be added when creating file paths. + /// If false, files will be written directly to base_dir. + pub is_dataset_root: bool, +} + struct WriterGenerator { + /// Default object store (used when no target bases specified) object_store: Arc, + /// Default base directory (used when no target bases specified) base_dir: Path, schema: Schema, storage_version: LanceFileVersion, + /// Target base information (if writing to specific bases) + target_bases_info: Option>, + /// Counter for round-robin selection + next_base_index: AtomicUsize, } impl WriterGenerator { @@ -658,26 +891,52 @@ impl WriterGenerator { base_dir: &Path, schema: &Schema, storage_version: LanceFileVersion, + target_bases_info: Option>, ) -> Self { Self { object_store, base_dir: base_dir.clone(), schema: schema.clone(), storage_version, + target_bases_info, + next_base_index: AtomicUsize::new(0), } } + /// Select the next target base using round-robin strategy. + /// TODO: In the future, we can develop different strategies for selecting target bases + fn select_target_base(&self) -> Option<&TargetBaseInfo> { + self.target_bases_info.as_ref().map(|bases| { + let index = self + .next_base_index + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + &bases[index % bases.len()] + }) + } + pub async fn new_writer(&self) -> Result<(Box, Fragment)> { // Use temporary ID 0; will assign ID later. let fragment = Fragment::new(0); - let writer = open_writer( - &self.object_store, - &self.schema, - &self.base_dir, - self.storage_version, - ) - .await?; + let writer = if let Some(base_info) = self.select_target_base() { + open_writer_with_options( + base_info.object_store.as_ref(), + &self.schema, + &base_info.base_dir, + self.storage_version, + base_info.is_dataset_root, + Some(base_info.base_id), + ) + .await? + } else { + open_writer( + self.object_store.as_ref(), + &self.schema, + &self.base_dir, + self.storage_version, + ) + .await? + }; Ok((writer, fragment)) } @@ -697,7 +956,7 @@ async fn resolve_commit_handler( .map(|opts| opts.object_store.is_some()) .unwrap_or_default() { - return Err(Error::InvalidInput { source: "when creating a dataset with a custom object store the commit_handler must also be specified".into(), location: location!() }); + return Err(Error::InvalidInput { source: "when creating a dataset with a custom object store the commit_handler must also be specified".into(), location: Default::default() }); } commit_handler_from_url(uri, store_options).await } @@ -706,7 +965,7 @@ async fn resolve_commit_handler( Err(Error::InvalidInput { source: "`s3+ddb://` scheme and custom commit handler are mutually exclusive" .into(), - location: location!(), + location: Default::default(), }) } else { Ok(commit_handler) @@ -944,6 +1203,7 @@ mod tests { schema, data_stream, write_params, + None, ) .await } @@ -1015,6 +1275,7 @@ mod tests { schema, data_stream, write_params, + None, ) .await .unwrap(); @@ -1092,6 +1353,7 @@ mod tests { schema.clone(), data_stream, write_params, + None, ) .await .unwrap(); @@ -1124,4 +1386,827 @@ mod tests { let batch = reader.read_batch(0, .., &schema).await.unwrap(); assert_eq!(batch, data); } + + #[tokio::test] + async fn test_explicit_data_file_bases_writer_generator() { + use arrow::datatypes::{DataType, Field as ArrowField, Schema as ArrowSchema}; + use lance_io::object_store::ObjectStore; + use std::sync::Arc; + + // Create test schema + let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "id", + DataType::Int32, + false, + )])); + let schema = Schema::try_from(arrow_schema.as_ref()).unwrap(); + + // Create in-memory object store + let object_store = Arc::new(ObjectStore::memory()); + let base_dir = Path::from("test/bucket2"); + + // Test WriterGenerator with explicit data file bases configuration + let target_bases = vec![TargetBaseInfo { + base_id: 2, + object_store: object_store.clone(), + base_dir: base_dir.clone(), + is_dataset_root: false, // Test uses direct data directory + }]; + let writer_generator = WriterGenerator::new( + object_store.clone(), + &base_dir, + &schema, + LanceFileVersion::Stable, + Some(target_bases), + ); + + // Create a writer + let (writer, fragment) = writer_generator.new_writer().await.unwrap(); + + // Verify fragment is created + assert_eq!(fragment.id, 0); // Temporary ID + + // Verify writer is created (we can't test much more without writing data) + drop(writer); // Clean up + } + + #[tokio::test] + async fn test_writer_with_base_id() { + use arrow::array::Int32Array; + use arrow::datatypes::{DataType, Field as ArrowField, Schema as ArrowSchema}; + use arrow::record_batch::RecordBatch; + use lance_io::object_store::ObjectStore; + use std::sync::Arc; + + // Create test data + let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "id", + DataType::Int32, + false, + )])); + let schema = Schema::try_from(arrow_schema.as_ref()).unwrap(); + + let batch = RecordBatch::try_new( + arrow_schema.clone(), + vec![Arc::new(Int32Array::from(vec![1, 2, 3]))], + ) + .unwrap(); + + // Create in-memory object store and writer + let object_store = Arc::new(ObjectStore::memory()); + let base_dir = Path::from("test/bucket2"); + + let mut inner_writer = open_writer_with_options( + object_store.as_ref(), + &schema, + &base_dir, + LanceFileVersion::Stable, + false, // Don't add /data + None, + ) + .await + .unwrap(); + + // Write data + inner_writer.write(&[batch]).await.unwrap(); + + // Finish and manually set base_id + let base_id = 2u32; + let (_num_rows, mut data_file) = inner_writer.finish().await.unwrap(); + data_file.base_id = Some(base_id); + + assert_eq!(data_file.base_id, Some(base_id)); + assert!(!data_file.path.is_empty()); + } + + #[tokio::test] + async fn test_round_robin_target_base_selection() { + use arrow::array::Int32Array; + use arrow::datatypes::{DataType, Field as ArrowField, Schema as ArrowSchema}; + use arrow::record_batch::RecordBatch; + use lance_io::object_store::ObjectStore; + use std::sync::Arc; + + // Create test schema + let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "id", + DataType::Int32, + false, + )])); + let schema = Schema::try_from(arrow_schema.as_ref()).unwrap(); + + // Create in-memory object stores for different bases + let store1 = Arc::new(ObjectStore::memory()); + let store2 = Arc::new(ObjectStore::memory()); + let store3 = Arc::new(ObjectStore::memory()); + + // Create WriterGenerator with multiple target bases + let target_bases = vec![ + TargetBaseInfo { + base_id: 1, + object_store: store1.clone(), + base_dir: Path::from("base1"), + is_dataset_root: false, + }, + TargetBaseInfo { + base_id: 2, + object_store: store2.clone(), + base_dir: Path::from("base2"), + is_dataset_root: false, + }, + TargetBaseInfo { + base_id: 3, + object_store: store3.clone(), + base_dir: Path::from("base3"), + is_dataset_root: false, + }, + ]; + + let writer_generator = WriterGenerator::new( + Arc::new(ObjectStore::memory()), + &Path::from("default"), + &schema, + LanceFileVersion::Stable, + Some(target_bases), + ); + + // Create test batch + let batch = RecordBatch::try_new( + arrow_schema.clone(), + vec![Arc::new(Int32Array::from(vec![1, 2, 3]))], + ) + .unwrap(); + + // Create multiple writers and verify round-robin selection + let mut base_ids = Vec::new(); + for _ in 0..6 { + let (mut writer, _fragment) = writer_generator.new_writer().await.unwrap(); + writer.write(std::slice::from_ref(&batch)).await.unwrap(); + let (_num_rows, data_file) = writer.finish().await.unwrap(); + base_ids.push(data_file.base_id.unwrap()); + } + + // Verify round-robin pattern: 1, 2, 3, 1, 2, 3 + assert_eq!(base_ids, vec![1, 2, 3, 1, 2, 3]); + } + + #[tokio::test] + async fn test_explicit_data_file_bases_path_parsing() { + // Test URI parsing logic + let test_cases = vec![ + ("s3://multi-path-test/test1/subBucket2", "test1/subBucket2"), + ("gs://my-bucket/path/to/data", "path/to/data"), + ("file:///tmp/test/bucket", "tmp/test/bucket"), + ]; + + for (uri, expected_path) in test_cases { + let url = url::Url::parse(uri).unwrap(); + let parsed_path = url.path().trim_start_matches('/'); + assert_eq!(parsed_path, expected_path, "Failed for URI: {}", uri); + } + } + + #[tokio::test] + async fn test_write_params_validation() { + // Test CREATE mode validation + let mut params = WriteParams { + mode: WriteMode::Create, + initial_bases: Some(vec![ + BasePath { + id: 1, + name: Some("bucket1".to_string()), + path: "s3://bucket1/path1".to_string(), + is_dataset_root: true, + }, + BasePath { + id: 2, + name: Some("bucket2".to_string()), + path: "s3://bucket2/path2".to_string(), + is_dataset_root: true, + }, + ]), + target_bases: Some(vec![1]), // Use ID 1 which corresponds to bucket1 + ..Default::default() + }; + + // This should be valid + let result = validate_write_params(¶ms); + assert!(result.is_ok()); + + // Test target_bases with ID not in initial_bases (should fail) + params.target_bases = Some(vec![99]); // ID 99 doesn't exist + let result = validate_write_params(¶ms); + assert!(result.is_err()); + + // Test CREATE mode with target_bases but no initial_bases (should fail) + params.initial_bases = None; + params.target_bases = Some(vec![1]); + let result = validate_write_params(¶ms); + assert!(result.is_err()); + } + + fn validate_write_params(params: &WriteParams) -> Result<()> { + // Replicate the validation logic from the main write function + if matches!(params.mode, WriteMode::Create) { + if let Some(target_bases) = ¶ms.target_bases { + if target_bases.len() != 1 { + return Err(Error::invalid_input( + format!( + "target_bases with {} elements is not supported", + target_bases.len() + ), + Default::default(), + )); + } + let target_base_id = target_bases[0]; + if let Some(initial_bases) = ¶ms.initial_bases { + if !initial_bases.iter().any(|bp| bp.id == target_base_id) { + return Err(Error::invalid_input( + format!( + "target_base_id {} must be one of the initial_bases in CREATE mode", + target_base_id + ), + Default::default(), + )); + } + } else { + return Err(Error::invalid_input( + "initial_bases must be provided when target_bases is specified in CREATE mode", + Default::default(), + )); + } + } + } + Ok(()) + } + + #[tokio::test] + async fn test_multi_base_create() { + use lance_testing::datagen::{BatchGenerator, IncrementingInt32}; + + // Create dataset with multi-base configuration + let test_uri = "memory://multi_base_test"; + let primary_uri = format!("{}/primary", test_uri); + let base1_uri = format!("{}/base1", test_uri); + let base2_uri = format!("{}/base2", test_uri); + + let mut data_gen = + BatchGenerator::new().col(Box::new(IncrementingInt32::new().named("id".to_owned()))); + + let dataset = crate::dataset::Dataset::write( + data_gen.batch(5), + &primary_uri, + Some(WriteParams { + mode: WriteMode::Create, + initial_bases: Some(vec![ + BasePath { + id: 1, + name: Some("base1".to_string()), + path: base1_uri.clone(), + is_dataset_root: true, + }, + BasePath { + id: 2, + name: Some("base2".to_string()), + path: base2_uri.clone(), + is_dataset_root: true, + }, + ]), + target_bases: Some(vec![1]), + ..Default::default() + }), + ) + .await + .unwrap(); + + // Verify dataset was created + assert_eq!(dataset.count_rows(None).await.unwrap(), 5); + + // Verify base_paths are registered in manifest + assert_eq!(dataset.manifest.base_paths.len(), 2); + assert!(dataset + .manifest + .base_paths + .values() + .any(|bp| bp.name == Some("base1".to_string()))); + assert!(dataset + .manifest + .base_paths + .values() + .any(|bp| bp.name == Some("base2".to_string()))); + + // Verify data was written to base1 + let fragments = dataset.get_fragments(); + assert!(!fragments.is_empty()); + for fragment in fragments { + assert!(fragment + .metadata + .files + .iter() + .any(|file| file.base_id == Some(1))); + } + + // Test validation: cannot specify both target_bases and target_base_names_or_paths + let mut data_gen2 = + BatchGenerator::new().col(Box::new(IncrementingInt32::new().named("id".to_owned()))); + + let result = Dataset::write( + data_gen2.batch(5), + &format!("{}/test_validation", test_uri), + Some(WriteParams { + mode: WriteMode::Create, + initial_bases: Some(vec![BasePath { + id: 1, + name: Some("base1".to_string()), + path: base1_uri.clone(), + is_dataset_root: true, + }]), + target_bases: Some(vec![1]), + target_base_names_or_paths: Some(vec!["base1".to_string()]), + ..Default::default() + }), + ) + .await; + + assert!(result.is_err()); + assert!(result + .unwrap_err() + .to_string() + .contains("Cannot specify both target_base_names_or_paths and target_bases")); + } + + #[tokio::test] + async fn test_multi_base_overwrite() { + use lance_testing::datagen::{BatchGenerator, IncrementingInt32}; + + // Create initial dataset + let test_uri = "memory://overwrite_test"; + let primary_uri = format!("{}/primary", test_uri); + let base1_uri = format!("{}/base1", test_uri); + let base2_uri = format!("{}/base2", test_uri); + let _base3_uri = format!("{}/base3", test_uri); + + let mut data_gen = + BatchGenerator::new().col(Box::new(IncrementingInt32::new().named("id".to_owned()))); + + let dataset = Dataset::write( + data_gen.batch(3), + &primary_uri, + Some(WriteParams { + mode: WriteMode::Create, + initial_bases: Some(vec![ + BasePath { + id: 1, + name: Some("base1".to_string()), + path: base1_uri.clone(), + is_dataset_root: true, + }, + BasePath { + id: 2, + name: Some("base2".to_string()), + path: base2_uri.clone(), + is_dataset_root: true, + }, + ]), + target_bases: Some(vec![1]), + ..Default::default() + }), + ) + .await + .unwrap(); + + assert_eq!(dataset.count_rows(None).await.unwrap(), 3); + + // Overwrite - should inherit existing base configuration (base1, base2) + // Write to base2 + let mut data_gen2 = + BatchGenerator::new().col(Box::new(IncrementingInt32::new().named("id".to_owned()))); + + let dataset = Dataset::write( + data_gen2.batch(2), + std::sync::Arc::new(dataset), + Some(WriteParams { + mode: WriteMode::Overwrite, + // No initial_bases - inherits existing base_paths + target_bases: Some(vec![2]), // Write to base2 + ..Default::default() + }), + ) + .await + .unwrap(); + + // Verify data was overwritten + assert_eq!(dataset.count_rows(None).await.unwrap(), 2); + + // Verify base_paths were inherited (still base1 and base2) + assert_eq!(dataset.manifest.base_paths.len(), 2); + assert!(dataset + .manifest + .base_paths + .values() + .any(|bp| bp.name == Some("base1".to_string()))); + assert!(dataset + .manifest + .base_paths + .values() + .any(|bp| bp.name == Some("base2".to_string()))); + + // Verify data was written to base2 (ID 2) + let fragments = dataset.get_fragments(); + assert!(fragments.iter().all(|f| f + .metadata + .files + .iter() + .all(|file| file.base_id == Some(2)))); + + // Test validation: cannot specify initial_bases in OVERWRITE mode + let mut data_gen3 = + BatchGenerator::new().col(Box::new(IncrementingInt32::new().named("id".to_owned()))); + + let result = Dataset::write( + data_gen3.batch(2), + Arc::new(dataset), + Some(WriteParams { + mode: WriteMode::Overwrite, + initial_bases: Some(vec![BasePath { + id: 3, + name: Some("base3".to_string()), + path: _base3_uri.clone(), + is_dataset_root: true, + }]), + target_bases: Some(vec![1]), + ..Default::default() + }), + ) + .await; + + assert!(result.is_err()); + assert!(result + .unwrap_err() + .to_string() + .contains("Cannot register new bases in Overwrite mode")); + } + + #[tokio::test] + async fn test_multi_base_append() { + use lance_testing::datagen::{BatchGenerator, IncrementingInt32}; + + // Create initial dataset with multi-base configuration + let test_uri = "memory://append_test"; + let primary_uri = format!("{}/primary", test_uri); + let base1_uri = format!("{}/base1", test_uri); + let base2_uri = format!("{}/base2", test_uri); + + let mut data_gen = + BatchGenerator::new().col(Box::new(IncrementingInt32::new().named("id".to_owned()))); + + let dataset = Dataset::write( + data_gen.batch(3), + &primary_uri, + Some(WriteParams { + mode: WriteMode::Create, + initial_bases: Some(vec![ + BasePath { + id: 1, + name: Some("base1".to_string()), + path: base1_uri.clone(), + is_dataset_root: true, + }, + BasePath { + id: 2, + name: Some("base2".to_string()), + path: base2_uri.clone(), + is_dataset_root: true, + }, + ]), + target_bases: Some(vec![1]), + ..Default::default() + }), + ) + .await + .unwrap(); + + assert_eq!(dataset.count_rows(None).await.unwrap(), 3); + + // Append to base1 (same base as initial write) + let mut data_gen2 = + BatchGenerator::new().col(Box::new(IncrementingInt32::new().named("id".to_owned()))); + + let dataset = Dataset::write( + data_gen2.batch(2), + std::sync::Arc::new(dataset), + Some(WriteParams { + mode: WriteMode::Append, + target_bases: Some(vec![1]), + ..Default::default() + }), + ) + .await + .unwrap(); + + assert_eq!(dataset.count_rows(None).await.unwrap(), 5); + + // Verify base_paths are still registered + assert_eq!(dataset.manifest.base_paths.len(), 2); + + // Append to base2 (different base) + let mut data_gen3 = + BatchGenerator::new().col(Box::new(IncrementingInt32::new().named("id".to_owned()))); + + let dataset = Dataset::write( + data_gen3.batch(4), + Arc::new(dataset), + Some(WriteParams { + mode: WriteMode::Append, + target_bases: Some(vec![2]), + ..Default::default() + }), + ) + .await + .unwrap(); + + assert_eq!(dataset.count_rows(None).await.unwrap(), 9); + + // Verify data is distributed across both bases + let fragments = dataset.get_fragments(); + let has_base1_data = fragments + .iter() + .any(|f| f.metadata.files.iter().any(|file| file.base_id == Some(1))); + let has_base2_data = fragments + .iter() + .any(|f| f.metadata.files.iter().any(|file| file.base_id == Some(2))); + + assert!(has_base1_data, "Should have data in base1"); + assert!(has_base2_data, "Should have data in base2"); + + // Test validation: cannot specify initial_bases in APPEND mode + let mut data_gen4 = + BatchGenerator::new().col(Box::new(IncrementingInt32::new().named("id".to_owned()))); + let base3_uri = format!("{}/base3", test_uri); + + let result = Dataset::write( + data_gen4.batch(2), + Arc::new(dataset), + Some(WriteParams { + mode: WriteMode::Append, + initial_bases: Some(vec![BasePath { + id: 3, + name: Some("base3".to_string()), + path: base3_uri, + is_dataset_root: true, + }]), + target_bases: Some(vec![1]), + ..Default::default() + }), + ) + .await; + + assert!(result.is_err()); + assert!(result + .unwrap_err() + .to_string() + .contains("Cannot register new bases in Append mode")); + } + + #[tokio::test] + async fn test_multi_base_is_dataset_root_flag() { + use lance_core::utils::tempfile::TempDir; + use lance_testing::datagen::{BatchGenerator, IncrementingInt32}; + + // Create dataset with different is_dataset_root settings using tempdir + let test_dir = TempDir::default(); + let primary_uri = test_dir.path_str(); + let base1_dir = test_dir.std_path().join("base1"); + let base2_dir = test_dir.std_path().join("base2"); + + std::fs::create_dir_all(&base1_dir).unwrap(); + std::fs::create_dir_all(&base2_dir).unwrap(); + + let base1_uri = format!("file://{}", base1_dir.display()); + let base2_uri = format!("file://{}", base2_dir.display()); + + let mut data_gen = + BatchGenerator::new().col(Box::new(IncrementingInt32::new().named("id".to_owned()))); + + let dataset = Dataset::write( + data_gen.batch(10), + &primary_uri, + Some(WriteParams { + mode: WriteMode::Create, + max_rows_per_file: 5, // Create multiple fragments + initial_bases: Some(vec![ + BasePath { + id: 1, + name: Some("base1".to_string()), + path: base1_uri.clone(), + is_dataset_root: true, // Files will go to base1/data/ + }, + BasePath { + id: 2, + name: Some("base2".to_string()), + path: base2_uri.clone(), + is_dataset_root: false, // Files will go directly to base2/ + }, + ]), + target_bases: Some(vec![1, 2]), // Write to both bases + ..Default::default() + }), + ) + .await + .unwrap(); + + assert_eq!(dataset.count_rows(None).await.unwrap(), 10); + + // Verify base_paths configuration + assert_eq!(dataset.manifest.base_paths.len(), 2); + + let base1 = dataset + .manifest + .base_paths + .values() + .find(|bp| bp.name == Some("base1".to_string())) + .expect("base1 not found"); + let base2 = dataset + .manifest + .base_paths + .values() + .find(|bp| bp.name == Some("base2".to_string())) + .expect("base2 not found"); + + // Verify is_dataset_root flags are persisted correctly in manifest + assert!( + base1.is_dataset_root, + "base1 should have is_dataset_root=true" + ); + assert!( + !base2.is_dataset_root, + "base2 should have is_dataset_root=false" + ); + + // Verify data was written to both bases + let fragments = dataset.get_fragments(); + assert!(!fragments.is_empty()); + + let has_base1_data = fragments + .iter() + .any(|f| f.metadata.files.iter().any(|file| file.base_id == Some(1))); + let has_base2_data = fragments + .iter() + .any(|f| f.metadata.files.iter().any(|file| file.base_id == Some(2))); + + assert!(has_base1_data, "Should have data in base1"); + assert!(has_base2_data, "Should have data in base2"); + + // Verify actual file paths on disk + // For base1 (is_dataset_root=true), files should be in base1/data/ + let base1_data_dir = base1_dir.join("data"); + assert!(base1_data_dir.exists(), "base1/data directory should exist"); + let base1_files: Vec<_> = std::fs::read_dir(&base1_data_dir) + .unwrap() + .filter_map(|e| e.ok()) + .filter(|e| { + e.path() + .extension() + .map(|ext| ext == "lance") + .unwrap_or(false) + }) + .collect(); + assert!( + !base1_files.is_empty(), + "base1/data should contain .lance files" + ); + + // For base2 (is_dataset_root=false), files should be directly in base2/ + let base2_files: Vec<_> = std::fs::read_dir(&base2_dir) + .unwrap() + .filter_map(|e| e.ok()) + .filter(|e| { + e.path() + .extension() + .map(|ext| ext == "lance") + .unwrap_or(false) + }) + .collect(); + assert!( + !base2_files.is_empty(), + "base2 should contain .lance files directly" + ); + + // Verify base2 does NOT have a data subdirectory with lance files + let base2_data_dir = base2_dir.join("data"); + if base2_data_dir.exists() { + let base2_data_files: Vec<_> = std::fs::read_dir(&base2_data_dir) + .unwrap() + .filter_map(|e| e.ok()) + .filter(|e| { + e.path() + .extension() + .map(|ext| ext == "lance") + .unwrap_or(false) + }) + .collect(); + assert!( + base2_data_files.is_empty(), + "base2/data should NOT contain .lance files" + ); + } + } + + #[tokio::test] + async fn test_multi_base_target_by_path_uri() { + use lance_core::utils::tempfile::TempDir; + use lance_testing::datagen::{BatchGenerator, IncrementingInt32}; + + // Create dataset with named bases + let test_dir = TempDir::default(); + let primary_uri = test_dir.path_str(); + let base1_dir = test_dir.std_path().join("base1"); + let base2_dir = test_dir.std_path().join("base2"); + + std::fs::create_dir_all(&base1_dir).unwrap(); + std::fs::create_dir_all(&base2_dir).unwrap(); + + let base1_uri = format!("file://{}", base1_dir.display()); + let base2_uri = format!("file://{}", base2_dir.display()); + + let mut data_gen = + BatchGenerator::new().col(Box::new(IncrementingInt32::new().named("id".to_owned()))); + + // Create initial dataset writing to base1 using name + let dataset = Dataset::write( + data_gen.batch(10), + &primary_uri, + Some(WriteParams { + mode: WriteMode::Create, + max_rows_per_file: 5, + initial_bases: Some(vec![ + BasePath { + id: 1, + name: Some("base1".to_string()), + path: base1_uri.clone(), + is_dataset_root: true, + }, + BasePath { + id: 2, + name: Some("base2".to_string()), + path: base2_uri.clone(), + is_dataset_root: true, + }, + ]), + target_base_names_or_paths: Some(vec!["base1".to_string()]), // Use name + ..Default::default() + }), + ) + .await + .unwrap(); + + assert_eq!(dataset.count_rows(None).await.unwrap(), 10); + + // Verify data was written to base1 + let fragments = dataset.get_fragments(); + assert!(fragments.iter().all(|f| f + .metadata + .files + .iter() + .all(|file| file.base_id == Some(1)))); + + // Now append using the path URI instead of name + let mut data_gen2 = + BatchGenerator::new().col(Box::new(IncrementingInt32::new().named("id".to_owned()))); + + let dataset = Dataset::write( + data_gen2.batch(5), + Arc::new(dataset), + Some(WriteParams { + mode: WriteMode::Append, + // Use the actual path URI instead of the name + target_base_names_or_paths: Some(vec![base2_uri.clone()]), + max_rows_per_file: 5, + ..Default::default() + }), + ) + .await + .unwrap(); + + assert_eq!(dataset.count_rows(None).await.unwrap(), 15); + + // Verify data is now in both bases + let fragments = dataset.get_fragments(); + let has_base1_data = fragments + .iter() + .any(|f| f.metadata.files.iter().any(|file| file.base_id == Some(1))); + let has_base2_data = fragments + .iter() + .any(|f| f.metadata.files.iter().any(|file| file.base_id == Some(2))); + + assert!(has_base1_data, "Should have data in base1"); + assert!(has_base2_data, "Should have data in base2"); + + // Verify base2 has exactly 1 fragment (from the append) + let base2_fragments: Vec<_> = fragments + .iter() + .filter(|f| f.metadata.files.iter().all(|file| file.base_id == Some(2))) + .collect(); + assert_eq!(base2_fragments.len(), 1, "Should have 1 fragment in base2"); + } } diff --git a/rust/lance/src/dataset/write/commit.rs b/rust/lance/src/dataset/write/commit.rs index 44a4e8485f4..bb1ed481378 100644 --- a/rust/lance/src/dataset/write/commit.rs +++ b/rust/lance/src/dataset/write/commit.rs @@ -357,6 +357,7 @@ impl<'a> CommitBuilder<'a> { &manifest_config, manifest_naming_scheme, metadata_cache.as_ref(), + session.store_registry(), ) .await? }; @@ -391,6 +392,7 @@ impl<'a> CommitBuilder<'a> { branch: manifest.branch.clone(), }, ); + Ok(Dataset { object_store, base: base_path, @@ -404,6 +406,7 @@ impl<'a> CommitBuilder<'a> { fragment_bitmap, metadata_cache, file_reader_options: None, + store_params: self.store_params.clone().map(Box::new), }) } } diff --git a/rust/lance/src/dataset/write/insert.rs b/rust/lance/src/dataset/write/insert.rs index 904580a7d1a..b8c9c225724 100644 --- a/rust/lance/src/dataset/write/insert.rs +++ b/rust/lance/src/dataset/write/insert.rs @@ -25,7 +25,7 @@ use snafu::location; use crate::dataset::builder::DatasetBuilder; use crate::dataset::transaction::{Operation, Transaction, TransactionBuilder}; -use crate::dataset::write::write_fragments_internal; +use crate::dataset::write::{validate_and_resolve_target_bases, write_fragments_internal}; use crate::dataset::ReadParams; use crate::Dataset; use crate::{Error, Result}; @@ -195,6 +195,10 @@ impl<'a> InsertBuilder<'a> { self.validate_write(&mut context, &schema)?; + let existing_base_paths = context.dest.dataset().map(|ds| &ds.manifest.base_paths); + let target_base_info = + validate_and_resolve_target_bases(&mut context.params, existing_base_paths).await?; + let written_frags = write_fragments_internal( context.dest.dataset(), context.object_store.clone(), @@ -202,6 +206,7 @@ impl<'a> InsertBuilder<'a> { schema.clone(), stream, context.params.clone(), + target_base_info, ) .await?; @@ -246,19 +251,24 @@ impl<'a> InsertBuilder<'a> { } None => None, }; + Operation::Overwrite { // Use the full schema, not the written schema schema, fragments: written_frags.default.0, config_upsert_values, + initial_bases: context.params.initial_bases.clone(), + } + } + WriteMode::Overwrite => { + Operation::Overwrite { + // Use the full schema, not the written schema + schema, + fragments: written_frags.default.0, + config_upsert_values: None, + initial_bases: context.params.initial_bases.clone(), } } - WriteMode::Overwrite => Operation::Overwrite { - // Use the full schema, not the written schema - schema, - fragments: written_frags.default.0, - config_upsert_values: None, - }, WriteMode::Append => Operation::Append { fragments: written_frags.default.0, }, @@ -269,6 +279,7 @@ impl<'a> InsertBuilder<'a> { schema: blob.1, fragments: blob.0, config_upsert_values: None, + initial_bases: context.params.initial_bases.clone(), }, WriteMode::Append => Operation::Append { fragments: blob.0 }, }); diff --git a/rust/lance/src/dataset/write/merge_insert.rs b/rust/lance/src/dataset/write/merge_insert.rs index dc6f471f6e8..c7904f04a26 100644 --- a/rust/lance/src/dataset/write/merge_insert.rs +++ b/rust/lance/src/dataset/write/merge_insert.rs @@ -1051,6 +1051,7 @@ impl MergeInsertJob { write_schema, stream, Default::default(), // TODO: support write params. + None, // Merge insert doesn't use target_bases ) .await?; @@ -1435,6 +1436,7 @@ impl MergeInsertJob { self.dataset.schema().clone(), Box::pin(stream), WriteParams::default(), + None, // Merge insert doesn't use target_bases ) .await?; diff --git a/rust/lance/src/dataset/write/merge_insert/exec/write.rs b/rust/lance/src/dataset/write/merge_insert/exec/write.rs index abbbc5e9b88..bd6fa18679d 100644 --- a/rust/lance/src/dataset/write/merge_insert/exec/write.rs +++ b/rust/lance/src/dataset/write/merge_insert/exec/write.rs @@ -823,6 +823,7 @@ impl ExecutionPlan for FullSchemaMergeInsertExec { dataset.schema().clone(), write_data_stream, WriteParams::default(), + None, // Merge insert doesn't use target_bases ) .await?; diff --git a/rust/lance/src/dataset/write/update.rs b/rust/lance/src/dataset/write/update.rs index 2b73d69802f..de944561236 100644 --- a/rust/lance/src/dataset/write/update.rs +++ b/rust/lance/src/dataset/write/update.rs @@ -329,6 +329,7 @@ impl UpdateJob { self.dataset.schema().clone(), Box::pin(stream), WriteParams::with_storage_version(version), + None, // TODO: support multiple bases for update ) .await?; diff --git a/rust/lance/src/io/commit.rs b/rust/lance/src/io/commit.rs index 46a35dc7d1e..1333c0ec566 100644 --- a/rust/lance/src/io/commit.rs +++ b/rust/lance/src/io/commit.rs @@ -40,14 +40,6 @@ use lance_table::io::commit::{ use rand::{rng, Rng}; use snafu::location; -use futures::future::Either; -use futures::{StreamExt, TryFutureExt, TryStreamExt}; -use lance_core::{Error, Result}; -use lance_index::{is_system_index, DatasetIndexExt}; -use log; -use object_store::path::Path; -use prost::Message; - use super::ObjectStore; use crate::dataset::cleanup::auto_cleanup_hook; use crate::dataset::fragment::FileFragment; @@ -61,6 +53,14 @@ use crate::session::caches::DSMetadataCache; use crate::session::index_caches::IndexMetadataKey; use crate::session::Session; use crate::Dataset; +use futures::future::Either; +use futures::{StreamExt, TryFutureExt, TryStreamExt}; +use lance_core::{Error, Result}; +use lance_index::{is_system_index, DatasetIndexExt}; +use lance_io::object_store::ObjectStoreRegistry; +use log; +use object_store::path::Path; +use prost::Message; mod conflict_resolver; #[cfg(all(feature = "dynamodb_tests", test))] @@ -109,6 +109,7 @@ async fn do_commit_new_dataset( manifest_naming_scheme: ManifestNamingScheme, blob_version: Option, metadata_cache: &DSMetadataCache, + store_registry: Arc, ) -> Result<(Manifest, ManifestLocation)> { let transaction_file = write_transaction_file(object_store, base_path, transaction).await?; @@ -120,12 +121,10 @@ async fn do_commit_new_dataset( .. } = &transaction.operation { + let source_base_path = + ObjectStore::extract_path_from_uri(store_registry, ref_path.as_str())?; let source_manifest_location = commit_handler - .resolve_version_location( - &Path::parse(ref_path.as_str())?, - *ref_version, - &object_store.inner, - ) + .resolve_version_location(&source_base_path, *ref_version, &object_store.inner) .await?; let source_manifest = Dataset::load_manifest( object_store, @@ -222,6 +221,7 @@ async fn do_commit_new_dataset( } } +#[allow(clippy::too_many_arguments)] pub(crate) async fn commit_new_dataset( object_store: &ObjectStore, commit_handler: &dyn CommitHandler, @@ -230,6 +230,7 @@ pub(crate) async fn commit_new_dataset( write_config: &ManifestWriteConfig, manifest_naming_scheme: ManifestNamingScheme, metadata_cache: &crate::session::caches::DSMetadataCache, + store_registry: Arc, ) -> Result<(Manifest, ManifestLocation)> { let blob_version = if let Some(blob_op) = transaction.blobs_op.as_ref() { let blob_path = base_path.child(BLOB_DIR); @@ -243,6 +244,7 @@ pub(crate) async fn commit_new_dataset( manifest_naming_scheme, None, metadata_cache, + store_registry.clone(), ) .await?; Some(blob_manifest.version) @@ -259,6 +261,7 @@ pub(crate) async fn commit_new_dataset( manifest_naming_scheme, blob_version, metadata_cache, + store_registry, ) .await } @@ -1520,8 +1523,8 @@ mod tests { Fragment { id: 0, files: vec![ - DataFile::new_legacy_from_fields("path1", vec![0, 1, 2]), - DataFile::new_legacy_from_fields("unused", vec![9]), + DataFile::new_legacy_from_fields("path1", vec![0, 1, 2], None), + DataFile::new_legacy_from_fields("unused", vec![9], None), ], deletion_file: None, row_id_meta: None, @@ -1530,8 +1533,8 @@ mod tests { Fragment { id: 1, files: vec![ - DataFile::new_legacy_from_fields("path2", vec![0, 1, 2]), - DataFile::new_legacy_from_fields("path3", vec![2]), + DataFile::new_legacy_from_fields("path2", vec![0, 1, 2], None), + DataFile::new_legacy_from_fields("path3", vec![2], None), ], deletion_file: None, row_id_meta: None, @@ -1564,7 +1567,11 @@ mod tests { let expected_fragments = vec![ Fragment { id: 0, - files: vec![DataFile::new_legacy_from_fields("path1", vec![0, 1, 10])], + files: vec![DataFile::new_legacy_from_fields( + "path1", + vec![0, 1, 10], + None, + )], deletion_file: None, row_id_meta: None, physical_rows: None, @@ -1572,8 +1579,8 @@ mod tests { Fragment { id: 1, files: vec![ - DataFile::new_legacy_from_fields("path2", vec![0, 1, 2]), - DataFile::new_legacy_from_fields("path3", vec![10]), + DataFile::new_legacy_from_fields("path2", vec![0, 1, 2], None), + DataFile::new_legacy_from_fields("path3", vec![10], None), ], deletion_file: None, row_id_meta: None, diff --git a/rust/lance/src/io/commit/conflict_resolver.rs b/rust/lance/src/io/commit/conflict_resolver.rs index da9a95e12b3..8a2543f0c7b 100644 --- a/rust/lance/src/io/commit/conflict_resolver.rs +++ b/rust/lance/src/io/commit/conflict_resolver.rs @@ -2050,6 +2050,7 @@ mod tests { "overwrite-key".to_string(), "value".to_string(), )])), + initial_bases: None, }, Operation::Rewrite { groups: vec![RewriteGroup { @@ -2152,6 +2153,7 @@ mod tests { fragments: vec![fragment0.clone(), fragment2.clone()], schema: lance_core::datatypes::Schema::default(), config_upsert_values: None, + initial_bases: None, }, // No conflicts: overwrite can always happen since it doesn't // depend on previous state of the table. diff --git a/rust/lance/src/utils/test.rs b/rust/lance/src/utils/test.rs index 7d393cad3ad..0e591500c70 100644 --- a/rust/lance/src/utils/test.rs +++ b/rust/lance/src/utils/test.rs @@ -124,6 +124,7 @@ impl TestDatasetGenerator { fragments, schema, config_upsert_values: None, + initial_bases: None, }; Dataset::commit(