diff --git a/java/lance-jni/src/blocking_dataset.rs b/java/lance-jni/src/blocking_dataset.rs index 04ca343a98e..5ac6bd8913c 100644 --- a/java/lance-jni/src/blocking_dataset.rs +++ b/java/lance-jni/src/blocking_dataset.rs @@ -1578,6 +1578,21 @@ fn inner_delete(env: &mut JNIEnv, java_dataset: JObject, predicate: JString) -> Ok(()) } +#[no_mangle] +pub extern "system" fn Java_org_lance_Dataset_nativeTruncateTable( + mut env: JNIEnv, + java_dataset: JObject, +) { + ok_or_throw_without_return!(env, inner_truncate_table(&mut env, java_dataset)) +} + +fn inner_truncate_table(env: &mut JNIEnv, java_dataset: JObject) -> Result<()> { + let mut dataset_guard = + unsafe { env.get_rust_field::<_, _, BlockingDataset>(java_dataset, NATIVE_DATASET) }?; + RT.block_on(dataset_guard.inner.truncate_table())?; + Ok(()) +} + ////////////////////////////// // Schema evolution Methods // ////////////////////////////// diff --git a/java/src/main/java/org/lance/Dataset.java b/java/src/main/java/org/lance/Dataset.java index 55e8a8b0983..193e7d49db6 100644 --- a/java/src/main/java/org/lance/Dataset.java +++ b/java/src/main/java/org/lance/Dataset.java @@ -654,6 +654,19 @@ public void delete(String predicate) { private native void nativeDelete(String predicate); + /** + * Truncate the dataset by deleting all rows. The schema is preserved and a new version is + * created. + */ + public void truncateTable() { + try (LockManager.WriteLock writeLock = lockManager.acquireWriteLock()) { + Preconditions.checkArgument(nativeDatasetHandle != 0, "Dataset is closed"); + nativeTruncateTable(); + } + } + + private native void nativeTruncateTable(); + /** * Gets the URI of the dataset. * diff --git a/java/src/test/java/org/lance/operation/TruncateTest.java b/java/src/test/java/org/lance/operation/TruncateTest.java new file mode 100644 index 00000000000..93f5b689e8c --- /dev/null +++ b/java/src/test/java/org/lance/operation/TruncateTest.java @@ -0,0 +1,65 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.lance.operation; + +import org.lance.Dataset; +import org.lance.FragmentMetadata; +import org.lance.TestUtils; +import org.lance.Transaction; + +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.types.pojo.Schema; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.nio.file.Path; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class TruncateTest extends OperationTestBase { + + @Test + void testTruncateTable(@TempDir Path tempDir) throws Exception { + String datasetPath = tempDir.resolve("testTruncate").toString(); + try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) { + TestUtils.SimpleTestDataset testDataset = + new TestUtils.SimpleTestDataset(allocator, datasetPath); + dataset = testDataset.createEmptyDataset(); + + // Append some data + int rowCount = 20; + FragmentMetadata fragmentMeta = testDataset.createNewFragment(rowCount); + Transaction transaction = + dataset + .newTransactionBuilder() + .operation( + Append.builder() + .fragments(java.util.Collections.singletonList(fragmentMeta)) + .build()) + .build(); + try (Dataset ds1 = transaction.commit()) { + assertEquals(rowCount, ds1.countRows()); + + // Truncate to empty while preserving schema + ds1.truncateTable(); + assertEquals(0, ds1.countRows()); + + try (org.lance.ipc.LanceScanner scanner = ds1.newScan()) { + Schema schemaRes = scanner.schema(); + assertEquals(testDataset.getSchema(), schemaRes); + } + } + } + } +} diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index 942654d5a7b..de6db503eb2 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -1996,6 +1996,14 @@ def delete( predicate = str(predicate) self._ds.delete(predicate, conflict_retries, retry_timeout) + def truncate_table(self) -> None: + """ + Truncate the dataset by deleting all rows. + The schema is preserved and a new version is created. + """ + self._ds.truncate_table() + self._list_indices_res = None + def insert( self, data: ReaderLike, diff --git a/python/python/tests/test_dataset.py b/python/python/tests/test_dataset.py index bd759bc7d4a..cf1210f88c9 100644 --- a/python/python/tests/test_dataset.py +++ b/python/python/tests/test_dataset.py @@ -108,6 +108,24 @@ def test_dataset_overwrite(tmp_path: Path): assert ds_v1.to_table() == table1 +def test_truncate_table(tmp_path: Path): + base_dir = tmp_path / "truncate" + table = pa.table( + { + "i": pa.array([1, 2, 3], pa.int32()), + "dict": pa.DictionaryArray.from_arrays( + pa.array([0, 1, 2], pa.uint16()), pa.array(["a", "b", "c"]) + ), + } + ) + ds = lance.write_dataset(table, base_dir, data_storage_version="stable") + assert ds.count_rows() == 3 + + ds.truncate_table() + assert ds.count_rows() == 0 + assert ds.schema == table.schema + + def test_dataset_append(tmp_path: Path): table = pa.Table.from_pydict({"colA": [1, 2, 3], "colB": [4, 5, 6]}) base_dir = tmp_path / "test" diff --git a/python/src/dataset.rs b/python/src/dataset.rs index dc88b6d72d9..9a2083a7673 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -1551,6 +1551,15 @@ impl Dataset { Ok(()) } + /// Truncate the dataset by deleting all rows. The schema is preserved and a new version is created. + fn truncate_table(&mut self) -> PyResult<()> { + let mut new_self = self.ds.as_ref().clone(); + rt().block_on(None, new_self.truncate_table())? + .map_err(|err: lance::Error| PyIOError::new_err(err.to_string()))?; + self.ds = Arc::new(new_self); + Ok(()) + } + /// Cleanup old versions from the dataset #[pyo3(signature = (older_than_micros = None, retain_versions = None, delete_unverified = None, error_if_tagged_old_versions = None))] fn cleanup_old_versions( diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 7d1f4dc8395..c996e977107 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -1562,6 +1562,11 @@ impl Dataset { write::delete::delete(self, predicate).await } + /// Truncate the dataset by deleting all rows. + pub async fn truncate_table(&mut self) -> Result<()> { + self.delete("true").await + } + /// Add new base paths to the dataset. /// /// This method allows you to register additional storage locations (buckets) diff --git a/rust/lance/src/dataset/tests/dataset_io.rs b/rust/lance/src/dataset/tests/dataset_io.rs index ca95d4e4772..cffbb97c706 100644 --- a/rust/lance/src/dataset/tests/dataset_io.rs +++ b/rust/lance/src/dataset/tests/dataset_io.rs @@ -44,6 +44,35 @@ use lance_table::io::manifest::read_manifest; use object_store::path::Path; use rstest::rstest; +#[tokio::test] +async fn test_truncate_table() { + let tmpdir = tempfile::tempdir().unwrap(); + let path = tmpdir.path(); + create_file(path, WriteMode::Create, LanceFileVersion::V2_2).await; + + let uri = path.to_str().unwrap(); + let mut ds = Dataset::open(uri).await.unwrap(); + let rows_before = ds.count_rows(None).await.unwrap(); + assert!(rows_before > 0); + + ds.truncate_table().await.unwrap(); + + let rows_after = ds.count_rows(None).await.unwrap(); + assert_eq!(rows_after, 0); + assert_eq!(ds.count_fragments(), 0); + + let expected_schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("i", DataType::Int32, false), + ArrowField::new( + "dict", + DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)), + false, + ), + ])); + let actual_schema = ArrowSchema::from(ds.schema()); + assert_eq!(&actual_schema, expected_schema.as_ref()); +} + #[rstest] #[lance_test_macros::test(tokio::test)] async fn test_create_dataset(