diff --git a/java/lance-jni/Cargo.lock b/java/lance-jni/Cargo.lock index 19ed5aaca10..4b844c87bc4 100644 --- a/java/lance-jni/Cargo.lock +++ b/java/lance-jni/Cargo.lock @@ -1429,9 +1429,9 @@ dependencies = [ [[package]] name = "datafusion" -version = "51.0.0" +version = "52.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ba7cb113e9c0bedf9e9765926031e132fa05a1b09ba6e93a6d1a4d7044457b8" +checksum = "d12ee9fdc6cdb5898c7691bb994f0ba606c4acc93a2258d78bb9f26ff8158bb3" dependencies = [ "arrow", "arrow-schema", @@ -1471,7 +1471,6 @@ dependencies = [ "parquet", "rand 0.9.2", "regex", - "rstest", "sqlparser", "tempfile", "tokio", @@ -1481,9 +1480,9 @@ dependencies = [ [[package]] name = "datafusion-catalog" -version = "51.0.0" +version = "52.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "66a3a799f914a59b1ea343906a0486f17061f39509af74e874a866428951130d" +checksum = "462dc9ef45e5d688aeaae49a7e310587e81b6016b9d03bace5626ad0043e5a9e" dependencies = [ "arrow", "async-trait", @@ -1506,9 +1505,9 @@ dependencies = [ [[package]] name = "datafusion-catalog-listing" -version = "51.0.0" +version = "52.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6db1b113c80d7a0febcd901476a57aef378e717c54517a163ed51417d87621b0" +checksum = "1b96dbf1d728fc321817b744eb5080cdd75312faa6980b338817f68f3caa4208" dependencies = [ "arrow", "async-trait", @@ -1525,21 +1524,20 @@ dependencies = [ "itertools 0.14.0", "log", "object_store", - "tokio", ] [[package]] name = "datafusion-common" -version = "51.0.0" +version = "52.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7c10f7659e96127d25e8366be7c8be4109595d6a2c3eac70421f380a7006a1b0" +checksum = "3237a6ff0d2149af4631290074289cae548c9863c885d821315d54c6673a074a" dependencies = [ "ahash", "arrow", "arrow-ipc", "chrono", "half", - "hashbrown 0.14.5", + "hashbrown 0.16.1", "indexmap", "libc", "log", @@ -1553,9 +1551,9 @@ dependencies = [ [[package]] name = "datafusion-common-runtime" -version = "51.0.0" +version = "52.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b92065bbc6532c6651e2f7dd30b55cba0c7a14f860c7e1d15f165c41a1868d95" +checksum = "70b5e34026af55a1bfccb1ef0a763cf1f64e77c696ffcf5a128a278c31236528" dependencies = [ "futures", "log", @@ -1564,9 +1562,9 @@ dependencies = [ [[package]] name = "datafusion-datasource" -version = "51.0.0" +version = "52.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fde13794244bc7581cd82f6fff217068ed79cdc344cafe4ab2c3a1c3510b38d6" +checksum = "1b2a6be734cc3785e18bbf2a7f2b22537f6b9fb960d79617775a51568c281842" dependencies = [ "arrow", "async-trait", @@ -1593,9 +1591,9 @@ dependencies = [ [[package]] name = "datafusion-datasource-arrow" -version = "51.0.0" +version = "52.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "804fa9b4ecf3157982021770617200ef7c1b2979d57bec9044748314775a9aea" +checksum = "1739b9b07c9236389e09c74f770e88aff7055250774e9def7d3f4f56b3dcc7be" dependencies = [ "arrow", "arrow-ipc", @@ -1617,9 +1615,9 @@ dependencies = [ [[package]] name = "datafusion-datasource-csv" -version = "51.0.0" +version = "52.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61a1641a40b259bab38131c5e6f48fac0717bedb7dc93690e604142a849e0568" +checksum = "61c73bc54b518bbba7c7650299d07d58730293cfba4356f6f428cc94c20b7600" dependencies = [ "arrow", "async-trait", @@ -1640,9 +1638,9 @@ dependencies = [ [[package]] name = "datafusion-datasource-json" -version = "51.0.0" +version = "52.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "adeacdb00c1d37271176f8fb6a1d8ce096baba16ea7a4b2671840c5c9c64fe85" +checksum = "37812c8494c698c4d889374ecfabbff780f1f26d9ec095dd1bddfc2a8ca12559" dependencies = [ "arrow", "async-trait", @@ -1662,9 +1660,9 @@ dependencies = [ [[package]] name = "datafusion-datasource-parquet" -version = "51.0.0" +version = "52.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43d0b60ffd66f28bfb026565d62b0a6cbc416da09814766a3797bba7d85a3cd9" +checksum = "2210937ecd9f0e824c397e73f4b5385c97cd1aff43ab2b5836fcfd2d321523fb" dependencies = [ "arrow", "async-trait", @@ -1692,18 +1690,19 @@ dependencies = [ [[package]] name = "datafusion-doc" -version = "51.0.0" +version = "52.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b99e13947667b36ad713549237362afb054b2d8f8cc447751e23ec61202db07" +checksum = "2c825f969126bc2ef6a6a02d94b3c07abff871acf4d6dd759ce1255edb7923ce" [[package]] name = "datafusion-execution" -version = "51.0.0" +version = "52.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "63695643190679037bc946ad46a263b62016931547bf119859c511f7ff2f5178" +checksum = "fa03ef05a2c2f90dd6c743e3e111078e322f4b395d20d4b4d431a245d79521ae" dependencies = [ "arrow", "async-trait", + "chrono", "dashmap", "datafusion-common", "datafusion-expr", @@ -1718,9 +1717,9 @@ dependencies = [ [[package]] name = "datafusion-expr" -version = "51.0.0" +version = "52.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9a4787cbf5feb1ab351f789063398f67654a6df75c4d37d7f637dc96f951a91" +checksum = "ef33934c1f98ee695cc51192cc5f9ed3a8febee84fdbcd9131bf9d3a9a78276f" dependencies = [ "arrow", "async-trait", @@ -1740,9 +1739,9 @@ dependencies = [ [[package]] name = "datafusion-expr-common" -version = "51.0.0" +version = "52.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ce2fb1b8c15c9ac45b0863c30b268c69dc9ee7a1ee13ecf5d067738338173dc" +checksum = "000c98206e3dd47d2939a94b6c67af4bfa6732dd668ac4fafdbde408fd9134ea" dependencies = [ "arrow", "datafusion-common", @@ -1753,9 +1752,9 @@ dependencies = [ [[package]] name = "datafusion-functions" -version = "51.0.0" +version = "52.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "794a9db7f7b96b3346fc007ff25e994f09b8f0511b4cf7dff651fadfe3ebb28f" +checksum = "379b01418ab95ca947014066248c22139fe9af9289354de10b445bd000d5d276" dependencies = [ "arrow", "arrow-buffer", @@ -1763,6 +1762,7 @@ dependencies = [ "blake2", "blake3", "chrono", + "chrono-tz", "datafusion-common", "datafusion-doc", "datafusion-execution", @@ -1783,9 +1783,9 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate" -version = "51.0.0" +version = "52.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1c25210520a9dcf9c2b2cbbce31ebd4131ef5af7fc60ee92b266dc7d159cb305" +checksum = "fd00d5454ba4c3f8ebbd04bd6a6a9dc7ced7c56d883f70f2076c188be8459e4c" dependencies = [ "ahash", "arrow", @@ -1804,9 +1804,9 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate-common" -version = "51.0.0" +version = "52.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62f4a66f3b87300bb70f4124b55434d2ae3fe80455f3574701d0348da040b55d" +checksum = "aec06b380729a87210a4e11f555ec2d729a328142253f8d557b87593622ecc9f" dependencies = [ "ahash", "arrow", @@ -1817,9 +1817,9 @@ dependencies = [ [[package]] name = "datafusion-functions-nested" -version = "51.0.0" +version = "52.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae5c06eed03918dc7fe7a9f082a284050f0e9ecf95d72f57712d1496da03b8c4" +checksum = "904f48d45e0f1eb7d0eb5c0f80f2b5c6046a85454364a6b16a2e0b46f62e7dff" dependencies = [ "arrow", "arrow-ord", @@ -1840,9 +1840,9 @@ dependencies = [ [[package]] name = "datafusion-functions-table" -version = "51.0.0" +version = "52.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db4fed1d71738fbe22e2712d71396db04c25de4111f1ec252b8f4c6d3b25d7f5" +checksum = "e9a0d20e2b887e11bee24f7734d780a2588b925796ac741c3118dd06d5aa77f0" dependencies = [ "arrow", "async-trait", @@ -1856,9 +1856,9 @@ dependencies = [ [[package]] name = "datafusion-functions-window" -version = "51.0.0" +version = "52.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d92206aa5ae21892f1552b4d61758a862a70956e6fd7a95cb85db1de74bc6d1" +checksum = "d3414b0a07e39b6979fe3a69c7aa79a9f1369f1d5c8e52146e66058be1b285ee" dependencies = [ "arrow", "datafusion-common", @@ -1874,9 +1874,9 @@ dependencies = [ [[package]] name = "datafusion-functions-window-common" -version = "51.0.0" +version = "52.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "53ae9bcc39800820d53a22d758b3b8726ff84a5a3e24cecef04ef4e5fdf1c7cc" +checksum = "5bf2feae63cd4754e31add64ce75cae07d015bce4bb41cd09872f93add32523a" dependencies = [ "datafusion-common", "datafusion-physical-expr-common", @@ -1884,9 +1884,9 @@ dependencies = [ [[package]] name = "datafusion-macros" -version = "51.0.0" +version = "52.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1063ad4c9e094b3f798acee16d9a47bd7372d9699be2de21b05c3bd3f34ab848" +checksum = "c4fe888aeb6a095c4bcbe8ac1874c4b9a4c7ffa2ba849db7922683ba20875aaf" dependencies = [ "datafusion-doc", "quote", @@ -1895,9 +1895,9 @@ dependencies = [ [[package]] name = "datafusion-optimizer" -version = "51.0.0" +version = "52.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f35f9ec5d08b87fd1893a30c2929f2559c2f9806ca072d8fefca5009dc0f06a" +checksum = "8a6527c063ae305c11be397a86d8193936f4b84d137fe40bd706dfc178cf733c" dependencies = [ "arrow", "chrono", @@ -1914,9 +1914,9 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" -version = "51.0.0" +version = "52.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c30cc8012e9eedcb48bbe112c6eff4ae5ed19cf3003cb0f505662e88b7014c5d" +checksum = "0bb028323dd4efd049dd8a78d78fe81b2b969447b39c51424167f973ac5811d9" dependencies = [ "ahash", "arrow", @@ -1926,19 +1926,20 @@ dependencies = [ "datafusion-functions-aggregate-common", "datafusion-physical-expr-common", "half", - "hashbrown 0.14.5", + "hashbrown 0.16.1", "indexmap", "itertools 0.14.0", "parking_lot", "paste", "petgraph", + "tokio", ] [[package]] name = "datafusion-physical-expr-adapter" -version = "51.0.0" +version = "52.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f9ff2dbd476221b1f67337699eff432781c4e6e1713d2aefdaa517dfbf79768" +checksum = "78fe0826aef7eab6b4b61533d811234a7a9e5e458331ebbf94152a51fc8ab433" dependencies = [ "arrow", "datafusion-common", @@ -1951,23 +1952,26 @@ dependencies = [ [[package]] name = "datafusion-physical-expr-common" -version = "51.0.0" +version = "52.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90da43e1ec550b172f34c87ec68161986ced70fd05c8d2a2add66eef9c276f03" +checksum = "cfccd388620734c661bd8b7ca93c44cdd59fecc9b550eea416a78ffcbb29475f" dependencies = [ "ahash", "arrow", + "chrono", "datafusion-common", "datafusion-expr-common", - "hashbrown 0.14.5", + "hashbrown 0.16.1", + "indexmap", "itertools 0.14.0", + "parking_lot", ] [[package]] name = "datafusion-physical-optimizer" -version = "51.0.0" +version = "52.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce9804f799acd7daef3be7aaffe77c0033768ed8fdbf5fb82fc4c5f2e6bc14e6" +checksum = "bde5fa10e73259a03b705d5fddc136516814ab5f441b939525618a4070f5a059" dependencies = [ "arrow", "datafusion-common", @@ -1983,27 +1987,27 @@ dependencies = [ [[package]] name = "datafusion-physical-plan" -version = "51.0.0" +version = "52.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0acf0ad6b6924c6b1aa7d213b181e012e2d3ec0a64ff5b10ee6282ab0f8532ac" +checksum = "0e1098760fb29127c24cc9ade3277051dc73c9ed0ac0131bd7bcd742e0ad7470" dependencies = [ "ahash", "arrow", "arrow-ord", "arrow-schema", "async-trait", - "chrono", "datafusion-common", "datafusion-common-runtime", "datafusion-execution", "datafusion-expr", + "datafusion-functions", "datafusion-functions-aggregate-common", "datafusion-functions-window-common", "datafusion-physical-expr", "datafusion-physical-expr-common", "futures", "half", - "hashbrown 0.14.5", + "hashbrown 0.16.1", "indexmap", "itertools 0.14.0", "log", @@ -2014,9 +2018,9 @@ dependencies = [ [[package]] name = "datafusion-pruning" -version = "51.0.0" +version = "52.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac2c2498a1f134a9e11a9f5ed202a2a7d7e9774bd9249295593053ea3be999db" +checksum = "64d0fef4201777b52951edec086c21a5b246f3c82621569ddb4a26f488bc38a9" dependencies = [ "arrow", "datafusion-common", @@ -2031,9 +2035,9 @@ dependencies = [ [[package]] name = "datafusion-session" -version = "51.0.0" +version = "52.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f96eebd17555386f459037c65ab73aae8df09f464524c709d6a3134ad4f4776" +checksum = "f71f1e39e8f2acbf1c63b0e93756c2e970a64729dab70ac789587d6237c4fde0" dependencies = [ "async-trait", "datafusion-common", @@ -2045,9 +2049,9 @@ dependencies = [ [[package]] name = "datafusion-sql" -version = "51.0.0" +version = "52.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3fc195fe60634b2c6ccfd131b487de46dc30eccae8a3c35a13f136e7f440414f" +checksum = "f44693cfcaeb7a9f12d71d1c576c3a6dc025a12cef209375fa2d16fb3b5670ee" dependencies = [ "arrow", "bigdecimal", @@ -2062,9 +2066,9 @@ dependencies = [ [[package]] name = "datafusion-substrait" -version = "51.0.0" +version = "52.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2505af06d103a55b4e8ded0c6aeb6c72a771948da939c0bd3f8eee67af475a9c" +checksum = "6042adacd0bd64e56c22f6a7f9ce0ce1793dd367c899d868179d029f110d9215" dependencies = [ "async-recursion", "async-trait", @@ -2478,12 +2482,6 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "037711b3d59c33004d3856fbdc83b99d4ff37a24768fa1be9ce3538a1cde4393" -[[package]] -name = "futures-timer" -version = "3.0.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" - [[package]] name = "futures-util" version = "0.3.32" @@ -2611,9 +2609,9 @@ dependencies = [ [[package]] name = "geodatafusion" -version = "0.2.0" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "773cfa1fb0d7f7661b76b3fde00f3ffd8e0ff7b3635096f0ff6294fe5ca62a2b" +checksum = "4cb8faa9b3bf4ae9f49b1f023b82d20626826f6448a7055498376146c10c4ead" dependencies = [ "arrow-arith", "arrow-array", @@ -2751,10 +2749,6 @@ name = "hashbrown" version = "0.14.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" -dependencies = [ - "ahash", - "allocator-api2", -] [[package]] name = "hashbrown" @@ -5193,12 +5187,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "relative-path" -version = "1.9.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba39f3699c378cd8970968dcbff9c43159ea4cfbd88d43c00b22f2ef10a435d2" - [[package]] name = "reqsign" version = "0.16.5" @@ -5339,35 +5327,6 @@ dependencies = [ "smallvec", ] -[[package]] -name = "rstest" -version = "0.26.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f5a3193c063baaa2a95a33f03035c8a72b83d97a54916055ba22d35ed3839d49" -dependencies = [ - "futures-timer", - "futures-util", - "rstest_macros", -] - -[[package]] -name = "rstest_macros" -version = "0.26.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c845311f0ff7951c5506121a9ad75aec44d083c31583b2ea5a30bcb0b0abba0" -dependencies = [ - "cfg-if", - "glob", - "proc-macro-crate", - "proc-macro2", - "quote", - "regex", - "relative-path", - "rustc_version", - "syn 2.0.117", - "unicode-ident", -] - [[package]] name = "rust-ini" version = "0.21.3" diff --git a/java/lance-jni/src/blocking_dataset.rs b/java/lance-jni/src/blocking_dataset.rs index 3d6c1ab8e64..b6431d17e99 100644 --- a/java/lance-jni/src/blocking_dataset.rs +++ b/java/lance-jni/src/blocking_dataset.rs @@ -3,6 +3,9 @@ use crate::error::{Error, Result}; use crate::ffi::JNIEnvExt; +use crate::namespace::{ + create_java_lance_namespace, BlockingDirectoryNamespace, BlockingRestNamespace, +}; use crate::session::{handle_from_session, session_from_handle}; use crate::storage_options::JavaStorageOptionsProvider; use crate::traits::{export_vec, import_vec, FromJObjectWithEnv, FromJString}; @@ -41,6 +44,7 @@ use lance::session::Session as LanceSession; use lance::table::format::IndexMetadata; use lance::table::format::{BasePath, Fragment}; use lance_core::datatypes::Schema as LanceSchema; +use lance_file::version::LanceFileVersion; use lance_index::optimize::OptimizeOptions; use lance_index::scalar::btree::BTreeParameters; use lance_index::DatasetIndexExt; @@ -322,20 +326,35 @@ impl BlockingDataset { Ok(indexes) } + #[allow(clippy::too_many_arguments)] pub fn commit_transaction( &mut self, transaction: Transaction, store_params: ObjectStoreParams, detached: bool, enable_v2_manifest_paths: bool, + use_stable_row_ids: Option, + storage_format: Option, + max_retries: u32, + skip_auto_cleanup: bool, ) -> Result { - let new_dataset = RT.block_on( - CommitBuilder::new(Arc::new(self.clone().inner)) - .with_store_params(store_params) - .with_detached(detached) - .enable_v2_manifest_paths(enable_v2_manifest_paths) - .execute(transaction), - )?; + let mut builder = CommitBuilder::new(Arc::new(self.clone().inner)) + .with_store_params(store_params) + .with_detached(detached) + .enable_v2_manifest_paths(enable_v2_manifest_paths); + if let Some(use_stable) = use_stable_row_ids { + builder = builder.use_stable_row_ids(use_stable); + } + if let Some(format) = storage_format { + builder = builder.with_storage_format(format); + } + if max_retries > 0 { + builder = builder.with_max_retries(max_retries); + } + if skip_auto_cleanup { + builder = builder.with_skip_auto_cleanup(true); + } + let new_dataset = RT.block_on(builder.execute(transaction))?; Ok(BlockingDataset { inner: new_dataset }) } @@ -575,34 +594,11 @@ fn inner_create_with_ffi_stream<'local>( namespace_obj: JObject, // LanceNamespace (can be null) table_id_obj: JObject, // List (can be null) ) -> Result> { - use crate::namespace::{ - create_java_lance_namespace, BlockingDirectoryNamespace, BlockingRestNamespace, - }; - let stream_ptr = arrow_array_stream_addr as *mut FFI_ArrowArrayStream; let reader = unsafe { ArrowArrayStreamReader::from_raw(stream_ptr) }?; // Create the namespace wrapper for commit handling (if provided) - let namespace_info = if namespace_obj.is_null() { - None - } else { - let namespace: Arc = if is_directory_namespace(env, &namespace_obj)? { - let native_handle = get_native_namespace_handle(env, &namespace_obj)?; - let ns = unsafe { &*(native_handle as *const BlockingDirectoryNamespace) }; - ns.inner.clone() - } else if is_rest_namespace(env, &namespace_obj)? { - let native_handle = get_native_namespace_handle(env, &namespace_obj)?; - let ns = unsafe { &*(native_handle as *const BlockingRestNamespace) }; - ns.inner.clone() - } else { - // Custom Java implementation, create a Java bridge wrapper - create_java_lance_namespace(env, &namespace_obj)? - }; - - // Extract table_id from Java List - let table_id = env.get_strings(&table_id_obj)?; - Some((namespace, table_id)) - }; + let namespace_info = extract_namespace_info(env, &namespace_obj, &table_id_obj)?; create_dataset( env, @@ -1150,10 +1146,6 @@ fn inner_open_native<'local>( namespace_obj: JObject, // LanceNamespace object, null if no namespace table_id_obj: JObject, // List, null if no namespace ) -> Result> { - use crate::namespace::{ - create_java_lance_namespace, BlockingDirectoryNamespace, BlockingRestNamespace, - }; - let path_str: String = path.extract(env)?; let version = env.get_u64_opt(&version_obj)?; let block_size = env.get_int_opt(&block_size_obj)?; @@ -1170,31 +1162,10 @@ fn inner_open_native<'local>( storage_options_provider.map(|v| Arc::new(v) as Arc); // Extract namespace and table_id if provided (before get_bytes_opt which holds borrow) - let (namespace, table_id) = if !namespace_obj.is_null() { - // Check if it's a native implementation using instanceof checks - let ns_arc: Arc = if is_directory_namespace(env, &namespace_obj)? { - let native_handle = get_native_namespace_handle(env, &namespace_obj)?; - let ns = unsafe { &*(native_handle as *const BlockingDirectoryNamespace) }; - ns.inner.clone() - } else if is_rest_namespace(env, &namespace_obj)? { - let native_handle = get_native_namespace_handle(env, &namespace_obj)?; - let ns = unsafe { &*(native_handle as *const BlockingRestNamespace) }; - ns.inner.clone() - } else { - // Custom Java implementation, create a Java bridge wrapper - create_java_lance_namespace(env, &namespace_obj)? - }; - - // Extract table_id from List - let table_id = if !table_id_obj.is_null() { - Some(env.get_strings(&table_id_obj)?) - } else { - None - }; - - (Some(ns_arc), table_id) - } else { - (None, None) + let namespace_info = extract_namespace_info(env, &namespace_obj, &table_id_obj)?; + let (namespace, table_id) = match namespace_info { + Some((ns, tid)) => (Some(ns), Some(tid)), + None => (None, None), }; let serialized_manifest = env.get_bytes_opt(&serialized_manifest)?; @@ -1246,6 +1217,36 @@ fn get_native_namespace_handle(env: &mut JNIEnv, namespace_obj: &JObject) -> Res .map_err(|e| Error::runtime_error(format!("getNativeHandle did not return a long: {}", e))) } +/// Extract namespace and table_id from Java objects into Rust types. +/// +/// Returns `None` if `namespace_obj` is null, otherwise returns the namespace +/// and table_id pair. +#[allow(clippy::type_complexity)] +pub(crate) fn extract_namespace_info( + env: &mut JNIEnv, + namespace_obj: &JObject, + table_id_obj: &JObject, +) -> Result, Vec)>> { + if namespace_obj.is_null() { + return Ok(None); + } + + let namespace: Arc = if is_directory_namespace(env, namespace_obj)? { + let native_handle = get_native_namespace_handle(env, namespace_obj)?; + let ns = unsafe { &*(native_handle as *const BlockingDirectoryNamespace) }; + ns.inner.clone() + } else if is_rest_namespace(env, namespace_obj)? { + let native_handle = get_native_namespace_handle(env, namespace_obj)?; + let ns = unsafe { &*(native_handle as *const BlockingRestNamespace) }; + ns.inner.clone() + } else { + create_java_lance_namespace(env, namespace_obj)? + }; + + let table_id = env.get_strings(table_id_obj)?; + Ok(Some((namespace, table_id))) +} + #[no_mangle] pub extern "system" fn Java_org_lance_Dataset_getFragmentsNative<'a>( mut env: JNIEnv<'a>, diff --git a/java/lance-jni/src/delta.rs b/java/lance-jni/src/delta.rs index 8a407a19167..c07b94bcf1c 100755 --- a/java/lance-jni/src/delta.rs +++ b/java/lance-jni/src/delta.rs @@ -126,13 +126,9 @@ fn inner_list_transactions<'local>( RT.block_on(delta_guard.inner.list_transactions())? }; - let java_dataset = env - .get_field(&j_delta, "dataset", "Lorg/lance/Dataset;")? - .l()?; - let array_list = env.new_object("java/util/ArrayList", "()V", &[])?; for tx in txs.into_iter() { - let jtx = convert_to_java_transaction(env, tx, &java_dataset)?; + let jtx = convert_to_java_transaction(env, tx)?; env.call_method( &array_list, "add", diff --git a/java/lance-jni/src/transaction.rs b/java/lance-jni/src/transaction.rs index ea5996aaeed..60621696a91 100644 --- a/java/lance-jni/src/transaction.rs +++ b/java/lance-jni/src/transaction.rs @@ -1,25 +1,35 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors -use crate::blocking_dataset::{BlockingDataset, NATIVE_DATASET}; +use crate::blocking_dataset::{extract_namespace_info, BlockingDataset, NATIVE_DATASET}; use crate::error::Result; -use crate::traits::{export_vec, import_vec_from_method, FromJObjectWithEnv, IntoJava, JLance}; +use crate::storage_options::JavaStorageOptionsProvider; +use crate::traits::{ + export_vec, import_vec_from_method, FromJObjectWithEnv, FromJString, IntoJava, JLance, +}; use crate::utils::{to_java_map, to_rust_map}; use crate::Error; use crate::JNIEnvExt; +use crate::RT; use arrow::datatypes::Schema; use arrow_schema::ffi::FFI_ArrowSchema; use chrono::DateTime; use jni::objects::{JByteArray, JLongArray, JMap, JObject, JString, JValue, JValueGen}; -use jni::sys::jboolean; +use jni::sys::{jboolean, jint}; use jni::JNIEnv; use lance::dataset::transaction::{ DataReplacementGroup, Operation, RewriteGroup, RewrittenIndex, Transaction, TransactionBuilder, UpdateMap, UpdateMapEntry, UpdateMode, }; +use lance::dataset::CommitBuilder; +use lance::io::commit::namespace_manifest::LanceNamespaceExternalManifestStore; use lance::io::ObjectStoreParams; use lance::table::format::{Fragment, IndexMetadata}; use lance_core::datatypes::Schema as LanceSchema; +use lance_file::version::LanceFileVersion; +use lance_io::object_store::StorageOptionsProvider; +use lance_table::io::commit::external_manifest::ExternalManifestCommitHandler; +use lance_table::io::commit::CommitHandler; use prost::Message; use prost_types::Any; use roaring::RoaringBitmap; @@ -286,7 +296,7 @@ fn inner_read_transaction<'local>( }; let transaction = match transaction { - Some(transaction) => convert_to_java_transaction(env, transaction, &java_dataset)?, + Some(transaction) => convert_to_java_transaction(env, transaction)?, None => JObject::null(), }; Ok(transaction) @@ -295,9 +305,12 @@ fn inner_read_transaction<'local>( pub(crate) fn convert_to_java_transaction<'local>( env: &mut JNIEnv<'local>, transaction: Transaction, - java_dataset: &JObject, ) -> Result> { let uuid = env.new_string(transaction.uuid)?; + let tag = match transaction.tag { + Some(tag) => JObject::from(env.new_string(tag)?), + None => JObject::null(), + }; let transaction_properties = match transaction.transaction_properties { Some(properties) => to_java_map(env, &properties)?, _ => JObject::null(), @@ -306,13 +319,12 @@ pub(crate) fn convert_to_java_transaction<'local>( let java_transaction = env.new_object( "org/lance/Transaction", - "(Lorg/lance/Dataset;JLjava/lang/String;Lorg/lance/operation/Operation;Ljava/util/Map;Ljava/util/Map;)V", + "(JLjava/lang/String;Lorg/lance/operation/Operation;Ljava/lang/String;Ljava/util/Map;)V", &[ - JValue::Object(java_dataset), JValue::Long(transaction.read_version as i64), JValue::Object(&uuid), JValue::Object(&operation), - JValue::Object(&JObject::null()), + JValue::Object(&tag), JValue::Object(&transaction_properties), ], )?; @@ -579,38 +591,90 @@ pub(crate) fn convert_to_java_schema<'local>( .l()?) } +fn parse_storage_format(name: &str) -> Result { + match name.to_lowercase().as_str() { + "legacy" => Ok(LanceFileVersion::Legacy), + "v2_0" | "v2.0" => Ok(LanceFileVersion::V2_0), + "stable" => Ok(LanceFileVersion::Stable), + "v2_1" | "v2.1" => Ok(LanceFileVersion::V2_1), + "next" => Ok(LanceFileVersion::Next), + "v2_2" | "v2.2" => Ok(LanceFileVersion::V2_2), + _ => Err(Error::input_error(format!( + "Unknown storage format: {}", + name + ))), + } +} + #[no_mangle] -pub extern "system" fn Java_org_lance_Dataset_nativeCommitTransaction<'local>( +#[allow(clippy::too_many_arguments)] +pub extern "system" fn Java_org_lance_CommitBuilder_nativeCommitToDataset<'local>( mut env: JNIEnv<'local>, + _cls: JObject, java_dataset: JObject, java_transaction: JObject, detached_jbool: jboolean, enable_v2_manifest_paths: jboolean, + write_params_obj: JObject, + use_stable_row_ids_obj: JObject, + storage_format_obj: JObject, + max_retries: jint, + skip_auto_cleanup: jboolean, ) -> JObject<'local> { ok_or_throw!( env, - inner_commit_transaction( + inner_commit_to_dataset( &mut env, java_dataset, java_transaction, detached_jbool != 0, enable_v2_manifest_paths != 0, + write_params_obj, + use_stable_row_ids_obj, + storage_format_obj, + max_retries as u32, + skip_auto_cleanup != 0, ) ) } -fn inner_commit_transaction<'local>( +#[allow(clippy::too_many_arguments)] +fn inner_commit_to_dataset<'local>( env: &mut JNIEnv<'local>, java_dataset: JObject, java_transaction: JObject, detached: bool, enable_v2_manifest_paths: bool, + write_params_obj: JObject, + use_stable_row_ids_obj: JObject, + storage_format_obj: JObject, + max_retries: u32, + skip_auto_cleanup: bool, ) -> Result> { - let write_param_jobj = env - .call_method(&java_transaction, "writeParams", "()Ljava/util/Map;", &[])? - .l()?; - let write_param_jmap = JMap::from_env(env, &write_param_jobj)?; - let write_param = to_rust_map(env, &write_param_jmap)?; + let write_param = if write_params_obj.is_null() { + HashMap::new() + } else { + let write_param_jmap = JMap::from_env(env, &write_params_obj)?; + to_rust_map(env, &write_param_jmap)? + }; + + // Parse optional use_stable_row_ids (boxed Boolean) + let use_stable_row_ids = if use_stable_row_ids_obj.is_null() { + None + } else { + let val = env + .call_method(&use_stable_row_ids_obj, "booleanValue", "()Z", &[])? + .z()?; + Some(val) + }; + + // Parse optional storage format string + let storage_format = if storage_format_obj.is_null() { + None + } else { + let format_str: String = JString::from(storage_format_obj).extract(env)?; + Some(parse_storage_format(&format_str)?) + }; // Get the Dataset's storage_options_accessor and merge with write_param let storage_options_accessor = { @@ -656,7 +720,15 @@ fn inner_commit_transaction<'local>( ..Default::default() }; - let transaction = convert_to_rust_transaction(env, java_transaction, Some(&java_dataset))?; + let java_allocator = env + .call_method( + &java_dataset, + "allocator", + "()Lorg/apache/arrow/memory/BufferAllocator;", + &[], + )? + .l()?; + let transaction = convert_to_rust_transaction(env, java_transaction, Some(&java_allocator))?; let new_blocking_ds = { let mut dataset_guard = unsafe { env.get_rust_field::<_, _, BlockingDataset>(&java_dataset, NATIVE_DATASET) }?; @@ -665,6 +737,10 @@ fn inner_commit_transaction<'local>( store_params, detached, enable_v2_manifest_paths, + use_stable_row_ids, + storage_format, + max_retries, + skip_auto_cleanup, )? }; new_blocking_ds.into_java(env) @@ -673,7 +749,7 @@ fn inner_commit_transaction<'local>( fn convert_to_rust_transaction( env: &mut JNIEnv, java_transaction: JObject, - java_dataset: Option<&JObject>, + allocator: Option<&JObject>, ) -> Result { let read_ver = env.get_u64_from_method(&java_transaction, "readVersion")?; let uuid = env.get_string_from_method(&java_transaction, "uuid")?; @@ -685,7 +761,12 @@ fn convert_to_rust_transaction( &[], )? .l()?; - let op = convert_to_rust_operation(env, &op, java_dataset)?; + let op = convert_to_rust_operation(env, &op, allocator)?; + + let tag = env.get_optional_from_method(&java_transaction, "tag", |env, tag_obj| { + let tag_str = JString::from(tag_obj); + tag_str.extract(env) + })?; let transaction_properties = env.get_optional_from_method( &java_transaction, @@ -697,6 +778,7 @@ fn convert_to_rust_transaction( )?; Ok(TransactionBuilder::new(read_ver, op) .uuid(uuid) + .tag(tag) .transaction_properties(transaction_properties.map(Arc::new)) .build()) } @@ -704,42 +786,44 @@ fn convert_to_rust_transaction( fn convert_schema_from_operation( env: &mut JNIEnv, java_operation: &JObject, - java_dataset: &JObject, + java_allocator: &JObject, ) -> Result { - let java_buffer_allocator = env - .call_method( - java_dataset, - "allocator", - "()Lorg/apache/arrow/memory/BufferAllocator;", - &[], - )? - .l()?; let schema_ptr = env .call_method( java_operation, "exportSchema", "(Lorg/apache/arrow/memory/BufferAllocator;)J", - &[JValue::Object(&java_buffer_allocator)], + &[JValue::Object(java_allocator)], )? .j()?; let c_schema_ptr = schema_ptr as *mut FFI_ArrowSchema; let c_schema = unsafe { FFI_ArrowSchema::from_raw(c_schema_ptr) }; let schema = Schema::try_from(&c_schema)?; - Ok( - LanceSchema::try_from(&schema) - .expect("Failed to convert from arrow schema to lance schema"), - ) + LanceSchema::try_from(&schema).map_err(|e| { + Error::input_error(format!( + "Failed to convert Arrow schema to Lance schema: {}", + e + )) + }) } fn convert_to_rust_operation( env: &mut JNIEnv<'_>, java_operation: &JObject<'_>, - java_dataset: Option<&JObject<'_>>, + allocator: Option<&JObject<'_>>, ) -> Result { let op_name = env.get_string_from_method(java_operation, "name")?; let op = match op_name.as_str() { "Project" => Operation::Project { - schema: convert_schema_from_operation(env, java_operation, java_dataset.unwrap())?, + schema: convert_schema_from_operation( + env, + java_operation, + allocator.ok_or_else(|| { + Error::input_error( + "BufferAllocator is required for Project operations".to_string(), + ) + })?, + )?, }, "UpdateConfig" => { let config_updates_obj = env @@ -860,7 +944,15 @@ fn convert_to_rust_operation( to_rust_map(env, &config_upsert_values) }, )?; - let schema = convert_schema_from_operation(env, java_operation, java_dataset.unwrap())?; + let schema = convert_schema_from_operation( + env, + java_operation, + allocator.ok_or_else(|| { + Error::input_error( + "BufferAllocator is required for Overwrite operations".to_string(), + ) + })?, + )?; Operation::Overwrite { fragments, schema, @@ -954,7 +1046,15 @@ fn convert_to_rust_operation( })?; Operation::Merge { fragments, - schema: convert_schema_from_operation(env, java_operation, java_dataset.unwrap())?, + schema: convert_schema_from_operation( + env, + java_operation, + allocator.ok_or_else(|| { + Error::input_error( + "BufferAllocator is required for Merge operations".to_string(), + ) + })?, + )?, } } "Restore" => { @@ -1068,3 +1168,157 @@ fn export_update_map<'a>( } } } + +#[no_mangle] +#[allow(clippy::too_many_arguments)] +pub extern "system" fn Java_org_lance_CommitBuilder_nativeCommitToUri<'local>( + mut env: JNIEnv<'local>, + _cls: JObject, + uri: JString, + java_transaction: JObject, + detached_jbool: jboolean, + enable_v2_manifest_paths: jboolean, + storage_options_provider_obj: JObject, + namespace_obj: JObject, + table_id_obj: JObject, + allocator_obj: JObject, + write_params_obj: JObject, + use_stable_row_ids_obj: JObject, + storage_format_obj: JObject, + max_retries: jint, + skip_auto_cleanup: jboolean, +) -> JObject<'local> { + ok_or_throw!( + env, + inner_commit_to_uri( + &mut env, + uri, + java_transaction, + detached_jbool != 0, + enable_v2_manifest_paths != 0, + storage_options_provider_obj, + namespace_obj, + table_id_obj, + allocator_obj, + write_params_obj, + use_stable_row_ids_obj, + storage_format_obj, + max_retries as u32, + skip_auto_cleanup != 0, + ) + ) +} + +#[allow(clippy::too_many_arguments)] +fn inner_commit_to_uri<'local>( + env: &mut JNIEnv<'local>, + uri: JString, + java_transaction: JObject, + detached: bool, + enable_v2_manifest_paths: bool, + storage_options_provider_obj: JObject, + namespace_obj: JObject, + table_id_obj: JObject, + allocator_obj: JObject, + write_params_obj: JObject, + use_stable_row_ids_obj: JObject, + storage_format_obj: JObject, + max_retries: u32, + skip_auto_cleanup: bool, +) -> Result> { + let uri_str: String = uri.extract(env)?; + + // Extract write params from parameter + let write_param = if write_params_obj.is_null() { + HashMap::new() + } else { + let write_param_jmap = JMap::from_env(env, &write_params_obj)?; + to_rust_map(env, &write_param_jmap)? + }; + + // Parse optional use_stable_row_ids (boxed Boolean) + let use_stable_row_ids = if use_stable_row_ids_obj.is_null() { + None + } else { + let val = env + .call_method(&use_stable_row_ids_obj, "booleanValue", "()Z", &[])? + .z()?; + Some(val) + }; + + // Parse optional storage format string + let storage_format = if storage_format_obj.is_null() { + None + } else { + let format_str: String = JString::from(storage_format_obj).extract(env)?; + Some(parse_storage_format(&format_str)?) + }; + + // Build storage options accessor + let storage_options_provider: Option = env + .get_optional(&storage_options_provider_obj, |env, provider_obj| { + JavaStorageOptionsProvider::new(env, provider_obj) + })?; + + let accessor = match (write_param.is_empty(), storage_options_provider) { + (false, Some(provider)) => Some(Arc::new( + lance::io::StorageOptionsAccessor::with_initial_and_provider( + write_param, + Arc::new(provider) as Arc, + ), + )), + (false, None) => Some(Arc::new( + lance::io::StorageOptionsAccessor::with_static_options(write_param), + )), + (true, Some(provider)) => Some(Arc::new(lance::io::StorageOptionsAccessor::with_provider( + Arc::new(provider) as Arc, + ))), + (true, None) => None, + }; + + let store_params = ObjectStoreParams { + storage_options_accessor: accessor, + ..Default::default() + }; + + // Convert Java transaction to Rust + let allocator_ref = if allocator_obj.is_null() { + None + } else { + Some(allocator_obj) + }; + let transaction = convert_to_rust_transaction(env, java_transaction, allocator_ref.as_ref())?; + + // Build CommitBuilder with URI + let mut builder = CommitBuilder::new(&*uri_str) + .with_store_params(store_params) + .with_detached(detached) + .enable_v2_manifest_paths(enable_v2_manifest_paths); + + if let Some(use_stable) = use_stable_row_ids { + builder = builder.use_stable_row_ids(use_stable); + } + if let Some(format) = storage_format { + builder = builder.with_storage_format(format); + } + if max_retries > 0 { + builder = builder.with_max_retries(max_retries); + } + if skip_auto_cleanup { + builder = builder.with_skip_auto_cleanup(true); + } + + // Set namespace commit handler if provided + let namespace_info = extract_namespace_info(env, &namespace_obj, &table_id_obj)?; + if let Some((ns, tid)) = namespace_info { + let external_store = LanceNamespaceExternalManifestStore::new(ns, tid); + let commit_handler: Arc = Arc::new(ExternalManifestCommitHandler { + external_manifest_store: Arc::new(external_store), + }); + builder = builder.with_commit_handler(commit_handler); + } + + let dataset = RT.block_on(builder.execute(transaction))?; + let blocking_ds = BlockingDataset { inner: dataset }; + blocking_ds.into_java(env) +} diff --git a/java/src/main/java/org/lance/CommitBuilder.java b/java/src/main/java/org/lance/CommitBuilder.java new file mode 100644 index 00000000000..0b2752aae89 --- /dev/null +++ b/java/src/main/java/org/lance/CommitBuilder.java @@ -0,0 +1,304 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.lance; + +import org.lance.io.StorageOptionsProvider; +import org.lance.namespace.LanceNamespace; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.util.Preconditions; + +import java.util.List; +import java.util.Map; + +/** + * Builder for committing a {@link Transaction} to a Lance dataset. + * + *

Supports two modes: + * + *

    + *
  • Dataset-based commit: commits against an existing dataset. + *
  • URI-based commit: creates or updates a dataset at a URI. + *
+ * + *

Example usage (dataset-based): + * + *

{@code
+ * try (Transaction txn = new Transaction.Builder()
+ *     .readVersion(dataset.version())
+ *     .operation(Append.builder().fragments(fragments).build())
+ *     .build();
+ *     Dataset committed = new CommitBuilder(dataset).execute(txn)) {
+ *     // use committed dataset
+ * }
+ * }
+ * + *

Example usage (URI-based): + * + *

{@code
+ * try (Transaction txn = new Transaction.Builder()
+ *     .operation(Overwrite.builder().fragments(fragments).schema(schema).build())
+ *     .build();
+ *     Dataset committed = new CommitBuilder(uri, allocator).execute(txn)) {
+ *     // use committed dataset
+ * }
+ * }
+ */ +public class CommitBuilder { + static { + JniLoader.ensureLoaded(); + } + + private final Dataset dataset; + private final String uri; + private final BufferAllocator allocator; + + private Map writeParams; + private StorageOptionsProvider storageOptionsProvider; + private LanceNamespace namespace; + private List tableId; + private boolean enableV2ManifestPaths = true; + private boolean detached = false; + private Boolean useStableRowIds; + private String storageFormat; + private int maxRetries = 0; + private boolean skipAutoCleanup = false; + + /** + * Create a commit builder for committing against an existing dataset. + * + * @param dataset the existing dataset to commit against + */ + public CommitBuilder(Dataset dataset) { + Preconditions.checkNotNull(dataset, "Dataset must not be null"); + this.dataset = dataset; + this.uri = null; + this.allocator = null; + } + + /** + * Create a commit builder for creating or updating a dataset at the given URI. + * + * @param uri the target URI for the dataset + * @param allocator the Arrow buffer allocator for schema export + */ + public CommitBuilder(String uri, BufferAllocator allocator) { + Preconditions.checkNotNull(uri, "URI must not be null"); + Preconditions.checkNotNull(allocator, "Allocator must not be null"); + this.dataset = null; + this.uri = uri; + this.allocator = allocator; + } + + /** + * Set write parameters (storage options) for the commit. + * + * @param writeParams the write parameters + * @return this builder instance + */ + public CommitBuilder writeParams(Map writeParams) { + this.writeParams = writeParams; + return this; + } + + /** + * Set the storage options provider for credential refresh during URI-based commits. + * + * @param provider the storage options provider + * @return this builder instance + */ + public CommitBuilder storageOptionsProvider(StorageOptionsProvider provider) { + this.storageOptionsProvider = provider; + return this; + } + + /** + * Set the namespace for managed versioning during URI-based commits. + * + * @param namespace the LanceNamespace instance + * @return this builder instance + */ + public CommitBuilder namespace(LanceNamespace namespace) { + this.namespace = namespace; + return this; + } + + /** + * Set the table ID for namespace-based commit handling. + * + * @param tableId the table identifier (e.g., ["workspace", "table_name"]) + * @return this builder instance + */ + public CommitBuilder tableId(List tableId) { + this.tableId = tableId; + return this; + } + + /** + * Enable or disable v2 manifest paths for new datasets. + * + *

Defaults to true. V2 manifest paths allow constant-time lookups for the latest manifest on + * object storage. Warning: enabling this makes the dataset unreadable for Lance versions prior to + * 0.17.0. + * + * @param enable whether to enable v2 manifest paths + * @return this builder instance + */ + public CommitBuilder enableV2ManifestPaths(boolean enable) { + this.enableV2ManifestPaths = enable; + return this; + } + + /** + * Set whether the commit should be detached from the main dataset lineage. + * + * @param detached if true, the commit will not be part of the main dataset lineage + * @return this builder instance + */ + public CommitBuilder detached(boolean detached) { + this.detached = detached; + return this; + } + + /** + * Whether to use stable row ids. This makes the {@code _rowid} column stable after compaction, + * but not updates. + * + *

This is only used for new datasets. Existing datasets will use their existing setting. + * Default is false. + * + * @param useStableRowIds whether to use stable row ids + * @return this builder instance + */ + public CommitBuilder useStableRowIds(boolean useStableRowIds) { + this.useStableRowIds = useStableRowIds; + return this; + } + + /** + * Set the storage format to use for the dataset. + * + *

This is only needed when creating a new empty table. If any data files are passed, the + * storage format will be inferred from the data files. Valid values: "legacy", "v2_0", "stable", + * "v2_1", "next", "v2_2". + * + * @param storageFormat the storage format name + * @return this builder instance + */ + public CommitBuilder storageFormat(String storageFormat) { + this.storageFormat = storageFormat; + return this; + } + + /** + * Set the maximum number of retries for commit operations. + * + *

If a commit operation fails, it will be retried up to {@code maxRetries} times. Default is + * 0. + * + * @param maxRetries the maximum number of retries + * @return this builder instance + */ + public CommitBuilder maxRetries(int maxRetries) { + this.maxRetries = maxRetries; + return this; + } + + /** + * Set whether to skip automatic cleanup after commit. + * + *

Default is false. + * + * @param skipAutoCleanup if true, skip automatic cleanup + * @return this builder instance + */ + public CommitBuilder skipAutoCleanup(boolean skipAutoCleanup) { + this.skipAutoCleanup = skipAutoCleanup; + return this; + } + + /** + * Execute the commit with the given transaction. + * + *

The caller is responsible for closing the transaction (via try-with-resources or {@link + * Transaction#close()}) to release any native resources held by the operation. + * + * @param transaction the transaction to commit + * @return a new Dataset at the committed version + */ + public Dataset execute(Transaction transaction) { + Preconditions.checkNotNull(transaction, "Transaction must not be null"); + if (dataset != null) { + Dataset result = + nativeCommitToDataset( + dataset, + transaction, + detached, + enableV2ManifestPaths, + writeParams, + useStableRowIds, + storageFormat, + maxRetries, + skipAutoCleanup); + result.setAllocator(dataset.allocator()); + return result; + } + if (uri != null) { + Dataset result = + nativeCommitToUri( + uri, + transaction, + detached, + enableV2ManifestPaths, + storageOptionsProvider, + namespace, + tableId, + allocator, + writeParams, + useStableRowIds, + storageFormat, + maxRetries, + skipAutoCleanup); + result.setAllocator(allocator); + return result; + } + throw new IllegalStateException("CommitBuilder requires either a dataset or a URI"); + } + + private static native Dataset nativeCommitToDataset( + Dataset dataset, + Transaction transaction, + boolean detached, + boolean enableV2ManifestPaths, + Map writeParams, + Boolean useStableRowIds, + String storageFormat, + int maxRetries, + boolean skipAutoCleanup); + + private static native Dataset nativeCommitToUri( + String uri, + Transaction transaction, + boolean detached, + boolean enableV2ManifestPaths, + Object storageOptionsProvider, + Object namespace, + Object tableId, + Object allocator, + Map writeParams, + Boolean useStableRowIds, + String storageFormat, + int maxRetries, + boolean skipAutoCleanup); +} diff --git a/java/src/main/java/org/lance/Dataset.java b/java/src/main/java/org/lance/Dataset.java index ef5340f5744..164e998d80a 100644 --- a/java/src/main/java/org/lance/Dataset.java +++ b/java/src/main/java/org/lance/Dataset.java @@ -511,14 +511,19 @@ public BufferAllocator allocator() { return allocator; } + /** Package-private setter for allocator, used by {@link CommitBuilder}. */ + void setAllocator(BufferAllocator allocator) { + this.allocator = allocator; + } + /** * Create a new transaction builder at current version for the dataset. The dataset itself will * not refresh after the transaction committed. * - * @return A new instance of {@link Transaction.Builder} linked to the opened dataset. + * @return A new instance of {@link SourcedTransaction.Builder} linked to the opened dataset. */ - public Transaction.Builder newTransactionBuilder() { - return new Transaction.Builder(this).readVersion(version()); + public SourcedTransaction.Builder newTransactionBuilder() { + return new SourcedTransaction.Builder(this); } /** @@ -548,22 +553,19 @@ public Dataset commitTransaction(Transaction transaction) { public Dataset commitTransaction( Transaction transaction, boolean detached, boolean enableV2ManifestPaths) { Preconditions.checkNotNull(transaction); - try { - Dataset dataset = nativeCommitTransaction(transaction, detached, enableV2ManifestPaths); - if (selfManagedAllocator) { - dataset.allocator = new RootAllocator(Long.MAX_VALUE); - } else { - dataset.allocator = allocator; - } - return dataset; - } finally { - transaction.release(); + Dataset dataset = + new CommitBuilder(this) + .detached(detached) + .enableV2ManifestPaths(enableV2ManifestPaths) + .execute(transaction); + if (selfManagedAllocator) { + dataset.allocator = new RootAllocator(Long.MAX_VALUE); + } else { + dataset.allocator = allocator; } + return dataset; } - private native Dataset nativeCommitTransaction( - Transaction transaction, boolean detached, boolean enableV2ManifestPaths); - /** * Drop a Dataset. * diff --git a/java/src/main/java/org/lance/SourcedTransaction.java b/java/src/main/java/org/lance/SourcedTransaction.java new file mode 100644 index 00000000000..d833c417752 --- /dev/null +++ b/java/src/main/java/org/lance/SourcedTransaction.java @@ -0,0 +1,168 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.lance; + +import org.lance.operation.Operation; + +import org.apache.arrow.util.Preconditions; + +import java.util.Map; +import java.util.Optional; + +/** + * A convenience wrapper that pairs a {@link Transaction} with a {@link Dataset}, providing a simple + * commit workflow. + * + *

This replaces the old {@code Transaction} class's "sourced" role where the transaction held a + * reference to the dataset it was built from. + * + *

Example usage: + * + *

{@code
+ * try (SourcedTransaction txn = dataset.newTransactionBuilder()
+ *     .operation(Append.builder().fragments(fragments).build())
+ *     .build();
+ *     Dataset committed = txn.commit()) {
+ *     // use committed dataset
+ * }
+ * }
+ */ +public class SourcedTransaction implements AutoCloseable { + + private final Transaction transaction; + private final Dataset dataset; + + private SourcedTransaction(Transaction transaction, Dataset dataset) { + this.transaction = transaction; + this.dataset = dataset; + } + + /** Returns the underlying {@link Transaction}. */ + public Transaction transaction() { + return transaction; + } + + /** Delegates to {@link Transaction#readVersion()}. */ + public long readVersion() { + return transaction.readVersion(); + } + + /** Delegates to {@link Transaction#uuid()}. */ + public String uuid() { + return transaction.uuid(); + } + + /** Delegates to {@link Transaction#operation()}. */ + public Operation operation() { + return transaction.operation(); + } + + /** Delegates to {@link Transaction#tag()}. */ + public Optional tag() { + return transaction.tag(); + } + + /** Delegates to {@link Transaction#transactionProperties()}. */ + public Optional> transactionProperties() { + return transaction.transactionProperties(); + } + + /** + * Commit this transaction against the source dataset. + * + * @return a new Dataset at the committed version + */ + public Dataset commit() { + return dataset.commitTransaction(transaction); + } + + /** + * Commit this transaction against the source dataset with additional options. + * + * @param detached if true, the commit will not be part of the main dataset lineage + * @param enableV2ManifestPaths if true, and this is a new dataset, uses the new V2 manifest paths + * @return a new Dataset at the committed version + */ + public Dataset commit(boolean detached, boolean enableV2ManifestPaths) { + return dataset.commitTransaction(transaction, detached, enableV2ManifestPaths); + } + + /** Release native resources held by the underlying transaction's operation. */ + @Override + public void close() { + transaction.close(); + } + + /** Builder for constructing {@link SourcedTransaction} instances from a {@link Dataset}. */ + public static class Builder { + private final Dataset dataset; + private long readVersion; + private Operation operation; + private String tag; + private Map transactionProperties; + + /** + * Create a builder for committing against an existing dataset. The read version defaults to the + * dataset's current version. + * + * @param dataset the existing dataset to commit against + */ + public Builder(Dataset dataset) { + this.dataset = dataset; + this.readVersion = dataset.version(); + } + + public Builder readVersion(long readVersion) { + this.readVersion = readVersion; + return this; + } + + public Builder operation(Operation operation) { + if (this.operation != null) { + throw new IllegalStateException( + String.format("Operation %s has been set", this.operation.name())); + } + this.operation = operation; + return this; + } + + /** + * Set an optional tag for the transaction. + * + * @param tag the tag string + * @return this builder instance + */ + public Builder tag(String tag) { + this.tag = tag; + return this; + } + + public Builder transactionProperties(Map properties) { + this.transactionProperties = properties; + return this; + } + + public SourcedTransaction build() { + Preconditions.checkState(operation != null, "TransactionBuilder has no operations"); + Transaction transaction = + new Transaction.Builder() + .readVersion(readVersion) + .operation(operation) + .tag(tag) + .transactionProperties(transactionProperties) + .build(); + return new SourcedTransaction(transaction, dataset); + } + } +} diff --git a/java/src/main/java/org/lance/Transaction.java b/java/src/main/java/org/lance/Transaction.java index 2d565c73258..92ef11551fe 100644 --- a/java/src/main/java/org/lance/Transaction.java +++ b/java/src/main/java/org/lance/Transaction.java @@ -18,41 +18,60 @@ import com.google.common.base.MoreObjects; import org.apache.arrow.util.Preconditions; -import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.UUID; /** - * Align with the Transaction struct in rust. The transaction won't commit the status to original - * dataset. It will return a new dataset after committed. + * A pure data container representing a Lance transaction. + * + *

A Transaction holds the read version, a unique identifier, the operation to perform, and + * optional transaction properties. It does not contain commit configuration or execution logic. + * + *

To commit a transaction, use {@link CommitBuilder} or {@link SourcedTransaction}. */ -public class Transaction { +public class Transaction implements AutoCloseable { private final long readVersion; private final String uuid; - private final Map writeParams; - private final Optional> transactionProperties; - // Mainly for JNI usage - private final Dataset dataset; private final Operation operation; + private final Optional tag; + private final Optional> transactionProperties; + /** + * Constructor used by JNI when reading transactions from native code. + * + * @param readVersion the version that was read when creating this transaction + * @param uuid the unique identifier for this transaction + * @param operation the operation to perform + * @param tag optional tag for the transaction + * @param transactionProperties optional transaction properties + */ private Transaction( - Dataset dataset, long readVersion, String uuid, Operation operation, - Map writeParams, + String tag, Map transactionProperties) { - this.dataset = dataset; this.readVersion = readVersion; this.uuid = uuid; this.operation = operation; - this.writeParams = writeParams != null ? writeParams : new HashMap<>(); + this.tag = Optional.ofNullable(tag); this.transactionProperties = Optional.ofNullable(transactionProperties); } + /** + * Create a transaction with the given read version and operation. A random UUID is generated + * automatically. + * + * @param readVersion the version that was read when creating this transaction + * @param operation the operation to perform + */ + public Transaction(long readVersion, Operation operation) { + this(readVersion, UUID.randomUUID().toString(), operation, null, null); + } + public long readVersion() { return readVersion; } @@ -65,22 +84,18 @@ public Operation operation() { return operation; } - public Map writeParams() { - return writeParams; + /** Returns the optional tag for this transaction. */ + public Optional tag() { + return tag; } public Optional> transactionProperties() { return transactionProperties; } - public Dataset commit() { - if (dataset == null) { - throw new UnsupportedOperationException("Transaction doesn't support create new dataset yet"); - } - return dataset.commitTransaction(this); - } - - public void release() { + /** Release native resources held by the operation (e.g. Arrow C schemas). */ + @Override + public void close() { operation.release(); } @@ -90,7 +105,7 @@ public String toString() { .add("readVersion", readVersion) .add("uuid", uuid) .add("operation", operation) - .add("writeParams", writeParams) + .add("tag", tag) .add("transactionProperties", transactionProperties) .toString(); } @@ -107,20 +122,24 @@ public boolean equals(Object o) { return readVersion == that.readVersion && uuid.equals(that.uuid) && Objects.equals(operation, that.operation) - && Objects.equals(writeParams, that.writeParams) + && Objects.equals(tag, that.tag) && Objects.equals(transactionProperties, that.transactionProperties); } + @Override + public int hashCode() { + return Objects.hash(readVersion, uuid, operation, tag, transactionProperties); + } + + /** Builder for constructing {@link Transaction} instances. */ public static class Builder { - private final String uuid; - private final Dataset dataset; + private String uuid; private long readVersion; private Operation operation; - private Map writeParams; + private String tag; private Map transactionProperties; - public Builder(Dataset dataset) { - this.dataset = dataset; + public Builder() { this.uuid = UUID.randomUUID().toString(); } @@ -129,34 +148,39 @@ public Builder readVersion(long readVersion) { return this; } - public Builder transactionProperties(Map properties) { - this.transactionProperties = properties; + public Builder uuid(String uuid) { + this.uuid = uuid; return this; } - public Builder writeParams(Map writeParams) { - this.writeParams = writeParams; + public Builder operation(Operation operation) { + if (this.operation != null) { + throw new IllegalStateException( + String.format("Operation %s has been set", this.operation.name())); + } + this.operation = operation; return this; } - public Builder operation(Operation operation) { - validateState(); - this.operation = operation; + /** + * Set an optional tag for the transaction. + * + * @param tag the tag string + * @return this builder instance + */ + public Builder tag(String tag) { + this.tag = tag; return this; } - private void validateState() { - if (operation != null) { - throw new IllegalStateException( - String.format("Operation %s has been set", operation.name())); - } + public Builder transactionProperties(Map properties) { + this.transactionProperties = properties; + return this; } public Transaction build() { Preconditions.checkState(operation != null, "TransactionBuilder has no operations"); - - return new Transaction( - dataset, readVersion, uuid, operation, writeParams, transactionProperties); + return new Transaction(readVersion, uuid, operation, tag, transactionProperties); } } } diff --git a/java/src/test/java/org/lance/DatasetTest.java b/java/src/test/java/org/lance/DatasetTest.java index a08a608161d..cb9942bdaaf 100644 --- a/java/src/test/java/org/lance/DatasetTest.java +++ b/java/src/test/java/org/lance/DatasetTest.java @@ -1147,8 +1147,8 @@ void testCommitTransactionDetachedTrue(@TempDir Path tempDir) { long baseRowCount = base.countRows(); FragmentMetadata fragment = suite.createNewFragment(5); Append append = Append.builder().fragments(Collections.singletonList(fragment)).build(); - Transaction transaction = base.newTransactionBuilder().operation(append).build(); - try (Dataset committed = base.commitTransaction(transaction, true, false)) { + SourcedTransaction transaction = base.newTransactionBuilder().operation(append).build(); + try (Dataset committed = base.commitTransaction(transaction.transaction(), true, false)) { // Original dataset is not refreshed to the new version. assertEquals(baseVersion, base.version()); assertEquals(baseRowCount, base.countRows()); @@ -1176,11 +1176,11 @@ void testCommitTransactionDetachedTrueOnV1ManifestThrowsUnsupported(@TempDir Pat FragmentMetadata fragment = suite.createNewFragment(3); Append append = Append.builder().fragments(Collections.singletonList(fragment)).build(); - Transaction transaction = dataset.newTransactionBuilder().operation(append).build(); + SourcedTransaction transaction = dataset.newTransactionBuilder().operation(append).build(); UnsupportedOperationException ex = assertThrows( UnsupportedOperationException.class, - () -> dataset.commitTransaction(transaction, true, false)); + () -> dataset.commitTransaction(transaction.transaction(), true, false)); // Error should indicate detached commits are not supported on v1 manifests. assertNotNull(ex.getMessage()); @@ -1210,9 +1210,10 @@ void testEnableStableRowIds(@TempDir Path tempDir) throws Exception { FragmentMetadata frag1 = testDataset.createNewFragment(10); FragmentMetadata frag2 = testDataset.createNewFragment(10); - Transaction.Builder builder = new Transaction.Builder(dataset); + SourcedTransaction.Builder builder = new SourcedTransaction.Builder(dataset); Append append = Append.builder().fragments(Arrays.asList(frag1, frag2)).build(); - Transaction transaction = builder.operation(append).readVersion(dataset.version()).build(); + SourcedTransaction transaction = + builder.operation(append).readVersion(dataset.version()).build(); // Step2: if move-stable-rowid is enabled, the rowids of new fragments should be // consecutive. diff --git a/java/src/test/java/org/lance/FragmentTest.java b/java/src/test/java/org/lance/FragmentTest.java index c1e646f492b..61bfc439290 100644 --- a/java/src/test/java/org/lance/FragmentTest.java +++ b/java/src/test/java/org/lance/FragmentTest.java @@ -210,7 +210,11 @@ void testDeleteRows(@TempDir Path tempDir) throws IOException { Update update = Update.builder().updatedFragments(Collections.singletonList(updateFragment)).build(); - Dataset dataset3 = dataset2.newTransactionBuilder().operation(update).build().commit(); + Dataset dataset3; + try (Transaction txn = + new Transaction.Builder().readVersion(dataset2.version()).operation(update).build()) { + dataset3 = new CommitBuilder(dataset2).execute(txn); + } assertEquals(totalRows - deleteCount, dataset3.countRows()); @@ -226,7 +230,11 @@ void testDeleteRows(@TempDir Path tempDir) throws IOException { update = Update.builder().updatedFragments(Collections.singletonList(updateFragment)).build(); - Dataset dataset4 = dataset3.newTransactionBuilder().operation(update).build().commit(); + Dataset dataset4; + try (Transaction txn = + new Transaction.Builder().readVersion(dataset3.version()).operation(update).build()) { + dataset4 = new CommitBuilder(dataset3).execute(txn); + } assertEquals(totalRows - deleteCount - deleteCount2, dataset4.countRows()); // Case 3. Test delete all rows @@ -242,7 +250,11 @@ void testDeleteRows(@TempDir Path tempDir) throws IOException { Update.builder() .removedFragmentIds(Collections.singletonList(Long.valueOf(fragment.getId()))) .build(); - Dataset dataset5 = dataset4.newTransactionBuilder().operation(update).build().commit(); + Dataset dataset5; + try (Transaction txn = + new Transaction.Builder().readVersion(dataset4.version()).operation(update).build()) { + dataset5 = new CommitBuilder(dataset4).execute(txn); + } assertEquals(0, dataset5.countRows()); } @@ -279,7 +291,6 @@ void testMergeColumns(@TempDir Path tempDir) throws Exception { // Commit fragment FragmentOperation.Append appendOp = new FragmentOperation.Append(Arrays.asList(fragmentMeta)); - Transaction transaction; try (Dataset dataset = Dataset.commit(allocator, datasetPath, appendOp, Optional.of(1L))) { assertEquals(2, dataset.version()); assertEquals(2, dataset.latestVersion()); @@ -293,43 +304,40 @@ void testMergeColumns(@TempDir Path tempDir) throws Exception { FragmentMergeResult mergeResult = testDataset.mergeColumn(fragment, 10); - Transaction.Builder builder = new Transaction.Builder(dataset); - transaction = - builder + try (Transaction transaction = + new Transaction.Builder() + .readVersion(dataset.version()) .operation( Merge.builder() .fragments(Collections.singletonList(mergeResult.getFragmentMetadata())) .schema(mergeResult.getSchema().asArrowSchema()) .build()) - .readVersion(dataset.version()) - .build(); - - assertNotNull(transaction); - - try (Dataset newDs = transaction.commit()) { - assertEquals(3, newDs.version()); - assertEquals(3, newDs.latestVersion()); - Fragment newFrag = newDs.getFragments().get(0); - try (LanceScanner scanner = newFrag.newScan()) { - Schema schemaRes = scanner.schema(); - assertTrue( - schemaRes.getFields().stream() - .anyMatch(field -> field.getName().equals("new_col1"))); - assertTrue( - schemaRes.getFields().stream() - .anyMatch(field -> field.getName().equals("new_col2"))); - - try (ArrowReader reader = scanner.scanBatches()) { - assertTrue(reader.loadNextBatch()); - VectorSchemaRoot root = reader.getVectorSchemaRoot(); - VarCharVector newCol1Vec = (VarCharVector) root.getVector("new_col1"); - VarCharVector newCol2Vec = (VarCharVector) root.getVector("new_col2"); - assertEquals(21, newCol2Vec.getValueCount()); - - // The first 10 rows are not null - assertNotNull(newCol1Vec.get(9)); - // Remaining rows are null - assertNull(newCol1Vec.get(10)); + .build()) { + try (Dataset newDs = new CommitBuilder(dataset).execute(transaction)) { + assertEquals(3, newDs.version()); + assertEquals(3, newDs.latestVersion()); + Fragment newFrag = newDs.getFragments().get(0); + try (LanceScanner scanner = newFrag.newScan()) { + Schema schemaRes = scanner.schema(); + assertTrue( + schemaRes.getFields().stream() + .anyMatch(field -> field.getName().equals("new_col1"))); + assertTrue( + schemaRes.getFields().stream() + .anyMatch(field -> field.getName().equals("new_col2"))); + + try (ArrowReader reader = scanner.scanBatches()) { + assertTrue(reader.loadNextBatch()); + VectorSchemaRoot root = reader.getVectorSchemaRoot(); + VarCharVector newCol1Vec = (VarCharVector) root.getVector("new_col1"); + VarCharVector newCol2Vec = (VarCharVector) root.getVector("new_col2"); + assertEquals(21, newCol2Vec.getValueCount()); + + // The first 10 rows are not null + assertNotNull(newCol1Vec.get(9)); + // Remaining rows are null + assertNull(newCol1Vec.get(10)); + } } } } diff --git a/java/src/test/java/org/lance/NamespaceIntegrationTest.java b/java/src/test/java/org/lance/NamespaceIntegrationTest.java index 2d6f8ab1443..472848a6151 100644 --- a/java/src/test/java/org/lance/NamespaceIntegrationTest.java +++ b/java/src/test/java/org/lance/NamespaceIntegrationTest.java @@ -1414,15 +1414,16 @@ void testTransactionCommitWithNamespace() throws Exception { // Create and commit transaction Append appendOp = Append.builder().fragments(newFragments).build(); - Transaction transaction = - new Transaction.Builder(datasetWithProvider) + try (Transaction transaction = + new Transaction.Builder() .readVersion(datasetWithProvider.version()) .operation(appendOp) - .build(); - - try (Dataset committedDataset = transaction.commit()) { - assertEquals(2, committedDataset.version()); - assertEquals(4, committedDataset.countRows()); + .build()) { + try (Dataset committedDataset = + new CommitBuilder(datasetWithProvider).execute(transaction)) { + assertEquals(2, committedDataset.version()); + assertEquals(4, committedDataset.countRows()); + } } } diff --git a/java/src/test/java/org/lance/SourcedTransactionTest.java b/java/src/test/java/org/lance/SourcedTransactionTest.java new file mode 100644 index 00000000000..ae2be91dd33 --- /dev/null +++ b/java/src/test/java/org/lance/SourcedTransactionTest.java @@ -0,0 +1,124 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.lance; + +import org.lance.operation.Append; + +import org.apache.arrow.memory.RootAllocator; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.nio.file.Path; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class SourcedTransactionTest { + + @Test + public void testSourcedTransaction(@TempDir Path tempDir) { + String datasetPath = tempDir.resolve("testSourcedTransaction").toString(); + try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) { + TestUtils.SimpleTestDataset testDataset = + new TestUtils.SimpleTestDataset(allocator, datasetPath); + try (Dataset dataset = testDataset.createEmptyDataset()) { + FragmentMetadata fragmentMeta = testDataset.createNewFragment(20); + + Map properties = new HashMap<>(); + properties.put("transactionType", "APPEND"); + properties.put("createdBy", "testUser"); + try (SourcedTransaction appendTxn = + dataset + .newTransactionBuilder() + .operation( + Append.builder().fragments(Collections.singletonList(fragmentMeta)).build()) + .transactionProperties(properties) + .build()) { + try (Dataset committedDataset = appendTxn.commit()) { + assertEquals(2, committedDataset.version()); + assertEquals(2, committedDataset.latestVersion()); + assertEquals(20, committedDataset.countRows()); + assertEquals(dataset.version(), appendTxn.readVersion()); + assertNotNull(appendTxn.uuid()); + + // Verify transaction properties + Map txnProps = + appendTxn.transactionProperties().orElse(new HashMap<>()); + assertEquals("APPEND", txnProps.get("transactionType")); + assertEquals("testUser", txnProps.get("createdBy")); + } + } + } + } + } + + @Test + public void testTag(@TempDir Path tempDir) { + String datasetPath = tempDir.resolve("testTag").toString(); + try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) { + TestUtils.SimpleTestDataset testDataset = + new TestUtils.SimpleTestDataset(allocator, datasetPath); + try (Dataset dataset = testDataset.createEmptyDataset()) { + FragmentMetadata fragmentMeta = testDataset.createNewFragment(10); + + try (SourcedTransaction txn = + dataset + .newTransactionBuilder() + .tag("release-v2") + .operation( + Append.builder().fragments(Collections.singletonList(fragmentMeta)).build()) + .build()) { + assertEquals("release-v2", txn.tag().orElse(null)); + assertEquals("release-v2", txn.transaction().tag().orElse(null)); + + try (Dataset committed = txn.commit()) { + Transaction readTx = committed.readTransaction().orElse(null); + assertNotNull(readTx); + assertEquals("release-v2", readTx.tag().orElse(null)); + } + } + } + } + } + + @Test + public void testReadVersionDefaultsToDatasetVersion(@TempDir Path tempDir) { + String datasetPath = tempDir.resolve("testReadVersionDefault").toString(); + try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) { + TestUtils.SimpleTestDataset testDataset = + new TestUtils.SimpleTestDataset(allocator, datasetPath); + try (Dataset dataset = testDataset.createEmptyDataset()) { + FragmentMetadata fragmentMeta = testDataset.createNewFragment(10); + + // Do not set readVersion explicitly — it should default to dataset.version() + try (SourcedTransaction txn = + dataset + .newTransactionBuilder() + .operation( + Append.builder().fragments(Collections.singletonList(fragmentMeta)).build()) + .build()) { + assertEquals(dataset.version(), txn.readVersion()); + + try (Dataset committed = txn.commit()) { + assertTrue(committed.version() > dataset.version()); + } + } + } + } + } +} diff --git a/java/src/test/java/org/lance/TestUtils.java b/java/src/test/java/org/lance/TestUtils.java index 4a884696e54..c9033361103 100644 --- a/java/src/test/java/org/lance/TestUtils.java +++ b/java/src/test/java/org/lance/TestUtils.java @@ -760,14 +760,16 @@ public Dataset createAndAppendRows(int totalRows, int batches) { fragments.add(createBlobFragment(batchRows, Integer.MAX_VALUE)); } - Transaction txn = - ds.newTransactionBuilder() + try (Transaction txn = + new Transaction.Builder() + .readVersion(ds.version()) .operation(org.lance.operation.Append.builder().fragments(fragments).build()) - .build(); - Dataset newDs = txn.commit(); - Preconditions.checkArgument( - newDs.countRows() == totalRows, "dataset row count mismatch after append"); - return newDs; + .build()) { + Dataset newDs = new CommitBuilder(ds).execute(txn); + Preconditions.checkArgument( + newDs.countRows() == totalRows, "dataset row count mismatch after append"); + return newDs; + } } } } diff --git a/java/src/test/java/org/lance/TransactionTest.java b/java/src/test/java/org/lance/TransactionTest.java index c9d0e937263..cdd95d94165 100644 --- a/java/src/test/java/org/lance/TransactionTest.java +++ b/java/src/test/java/org/lance/TransactionTest.java @@ -19,8 +19,10 @@ import org.lance.index.scalar.ScalarIndexParams; import org.lance.operation.Append; import org.lance.operation.CreateIndex; +import org.lance.operation.Overwrite; import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.types.pojo.Schema; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -37,41 +39,6 @@ public class TransactionTest { - @Test - public void testTransaction(@TempDir Path tempDir) { - String datasetPath = tempDir.resolve("testTransaction").toString(); - try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) { - TestUtils.SimpleTestDataset testDataset = - new TestUtils.SimpleTestDataset(allocator, datasetPath); - try (Dataset dataset = testDataset.createEmptyDataset()) { - FragmentMetadata fragmentMeta = testDataset.createNewFragment(20); - - Map properties = new HashMap<>(); - properties.put("transactionType", "APPEND"); - properties.put("createdBy", "testUser"); - Transaction appendTxn = - dataset - .newTransactionBuilder() - .operation( - Append.builder().fragments(Collections.singletonList(fragmentMeta)).build()) - .transactionProperties(properties) - .build(); - try (Dataset committedDataset = appendTxn.commit()) { - assertEquals(2, committedDataset.version()); - assertEquals(2, committedDataset.latestVersion()); - assertEquals(20, committedDataset.countRows()); - assertEquals(dataset.version(), appendTxn.readVersion()); - assertNotNull(appendTxn.uuid()); - - // Verify transaction properties - Map txnProps = appendTxn.transactionProperties().orElse(new HashMap<>()); - assertEquals("APPEND", txnProps.get("transactionType")); - assertEquals("testUser", txnProps.get("createdBy")); - } - } - } - } - @Test public void testReadTransactionCreateIndex(@TempDir Path tempDir) { String datasetPath = tempDir.resolve("read_transaction_create_index").toString(); @@ -109,4 +76,122 @@ public void testReadTransactionCreateIndex(@TempDir Path tempDir) { } } } + + @Test + public void testCommitToUri(@TempDir Path tempDir) { + String datasetPath = tempDir.resolve("testCommitToUri").toString(); + try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) { + TestUtils.SimpleTestDataset testDataset = + new TestUtils.SimpleTestDataset(allocator, datasetPath); + Schema schema = testDataset.getSchema(); + + // Create fragments at the dataset path + FragmentMetadata fragmentMeta = testDataset.createNewFragment(20); + + // Build a transaction targeting a URI (no existing dataset) + try (Transaction txn = + new Transaction.Builder() + .operation( + Overwrite.builder() + .fragments(Collections.singletonList(fragmentMeta)) + .schema(schema) + .build()) + .build()) { + try (Dataset committedDataset = new CommitBuilder(datasetPath, allocator).execute(txn)) { + assertEquals(1, committedDataset.version()); + assertEquals(20, committedDataset.countRows()); + } + } + } + } + + @Test + public void testTagRoundTrip(@TempDir Path tempDir) { + String datasetPath = tempDir.resolve("testTagRoundTrip").toString(); + try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) { + TestUtils.SimpleTestDataset testDataset = + new TestUtils.SimpleTestDataset(allocator, datasetPath); + try (Dataset dataset = testDataset.createEmptyDataset()) { + FragmentMetadata fragmentMeta = testDataset.createNewFragment(10); + + try (Transaction txn = + new Transaction.Builder() + .readVersion(dataset.version()) + .tag("v1.0") + .operation( + Append.builder().fragments(Collections.singletonList(fragmentMeta)).build()) + .build()) { + assertEquals("v1.0", txn.tag().orElse(null)); + + try (Dataset committed = new CommitBuilder(dataset).execute(txn)) { + Transaction readTx = committed.readTransaction().orElse(null); + assertNotNull(readTx); + assertEquals("v1.0", readTx.tag().orElse(null)); + } + } + } + } + } + + @Test + public void testTransactionPropertiesRoundTrip(@TempDir Path tempDir) { + String datasetPath = tempDir.resolve("testTransactionPropertiesRoundTrip").toString(); + try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) { + TestUtils.SimpleTestDataset testDataset = + new TestUtils.SimpleTestDataset(allocator, datasetPath); + try (Dataset dataset = testDataset.createEmptyDataset()) { + FragmentMetadata fragmentMeta = testDataset.createNewFragment(10); + + Map properties = new HashMap<>(); + properties.put("source", "ingestion-pipeline"); + properties.put("batchId", "42"); + + try (Transaction txn = + new Transaction.Builder() + .readVersion(dataset.version()) + .transactionProperties(properties) + .operation( + Append.builder().fragments(Collections.singletonList(fragmentMeta)).build()) + .build()) { + try (Dataset committed = new CommitBuilder(dataset).execute(txn)) { + Transaction readTx = committed.readTransaction().orElse(null); + assertNotNull(readTx); + Map readProps = readTx.transactionProperties().orElse(null); + assertNotNull(readProps); + assertEquals("ingestion-pipeline", readProps.get("source")); + assertEquals("42", readProps.get("batchId")); + } + } + } + } + } + + @Test + public void testCustomUuid(@TempDir Path tempDir) { + String datasetPath = tempDir.resolve("testCustomUuid").toString(); + try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) { + TestUtils.SimpleTestDataset testDataset = + new TestUtils.SimpleTestDataset(allocator, datasetPath); + try (Dataset dataset = testDataset.createEmptyDataset()) { + FragmentMetadata fragmentMeta = testDataset.createNewFragment(10); + + String customUuid = "custom-uuid-12345"; + try (Transaction txn = + new Transaction.Builder() + .readVersion(dataset.version()) + .uuid(customUuid) + .operation( + Append.builder().fragments(Collections.singletonList(fragmentMeta)).build()) + .build()) { + assertEquals(customUuid, txn.uuid()); + + try (Dataset committed = new CommitBuilder(dataset).execute(txn)) { + Transaction readTx = committed.readTransaction().orElse(null); + assertNotNull(readTx); + assertEquals(customUuid, readTx.uuid()); + } + } + } + } + } } diff --git a/java/src/test/java/org/lance/index/ScalarIndexTest.java b/java/src/test/java/org/lance/index/ScalarIndexTest.java index 8b756633692..70ef43c853c 100644 --- a/java/src/test/java/org/lance/index/ScalarIndexTest.java +++ b/java/src/test/java/org/lance/index/ScalarIndexTest.java @@ -13,6 +13,7 @@ */ package org.lance.index; +import org.lance.CommitBuilder; import org.lance.Dataset; import org.lance.Fragment; import org.lance.TestUtils; @@ -167,13 +168,16 @@ public void testCreateBTreeIndexDistributively(@TempDir Path tempDir) throws Exc CreateIndex createIndexOp = CreateIndex.builder().withNewIndices(Collections.singletonList(index)).build(); - Transaction createIndexTx = - dataset.newTransactionBuilder().operation(createIndexOp).build(); - - try (Dataset newDataset = createIndexTx.commit()) { - // new dataset should contain that index - assertEquals(datasetVersion + 1, newDataset.version()); - assertTrue(newDataset.listIndexes().contains("test_index")); + try (Transaction createIndexTx = + new Transaction.Builder() + .readVersion(datasetVersion) + .operation(createIndexOp) + .build()) { + try (Dataset newDataset = new CommitBuilder(dataset).execute(createIndexTx)) { + // new dataset should contain that index + assertEquals(datasetVersion + 1, newDataset.version()); + assertTrue(newDataset.listIndexes().contains("test_index")); + } } } } @@ -244,30 +248,33 @@ public void testRangedBTreeIndex(@TempDir Path tempDir) throws Exception { CreateIndex createIndexOp = CreateIndex.builder().withNewIndices(Collections.singletonList(index)).build(); - Transaction createIndexTx = - dataset.newTransactionBuilder().operation(createIndexOp).build(); - - try (Dataset newDataset = createIndexTx.commit()) { - // new dataset should contain that index - assertEquals(datasetVersion + 1, newDataset.version()); - assertTrue(newDataset.listIndexes().contains("test_index")); - - // 7. compare results - // force use index should get the right value - ScanOptions scanOptions = - new ScanOptions.Builder().withRowId(true).filter("id in (10, 20, 30)").build(); - try (LanceScanner scanner = newDataset.newScan(scanOptions); - ArrowReader arrowReader = scanner.scanBatches(); ) { - List ids = new ArrayList<>(); - while (arrowReader.loadNextBatch()) { - VectorSchemaRoot root = arrowReader.getVectorSchemaRoot(); - IntVector idVec = (IntVector) root.getVector("id"); - for (int i = 0; i < idVec.getValueCount(); i++) { - ids.add(idVec.get(i)); + try (Transaction createIndexTx = + new Transaction.Builder() + .readVersion(datasetVersion) + .operation(createIndexOp) + .build()) { + try (Dataset newDataset = new CommitBuilder(dataset).execute(createIndexTx)) { + // new dataset should contain that index + assertEquals(datasetVersion + 1, newDataset.version()); + assertTrue(newDataset.listIndexes().contains("test_index")); + + // 7. compare results + // force use index should get the right value + ScanOptions scanOptions = + new ScanOptions.Builder().withRowId(true).filter("id in (10, 20, 30)").build(); + try (LanceScanner scanner = newDataset.newScan(scanOptions); + ArrowReader arrowReader = scanner.scanBatches(); ) { + List ids = new ArrayList<>(); + while (arrowReader.loadNextBatch()) { + VectorSchemaRoot root = arrowReader.getVectorSchemaRoot(); + IntVector idVec = (IntVector) root.getVector("id"); + for (int i = 0; i < idVec.getValueCount(); i++) { + ids.add(idVec.get(i)); + } } + Collections.sort(ids); + Assertions.assertIterableEquals(Arrays.asList(10, 20, 30), ids); } - Collections.sort(ids); - Assertions.assertIterableEquals(Arrays.asList(10, 20, 30), ids); } } } diff --git a/java/src/test/java/org/lance/index/VectorIndexTest.java b/java/src/test/java/org/lance/index/VectorIndexTest.java index 771505b9efd..4a5902044da 100755 --- a/java/src/test/java/org/lance/index/VectorIndexTest.java +++ b/java/src/test/java/org/lance/index/VectorIndexTest.java @@ -13,6 +13,7 @@ */ package org.lance.index; +import org.lance.CommitBuilder; import org.lance.Dataset; import org.lance.Fragment; import org.lance.TestVectorDataset; @@ -130,12 +131,15 @@ public void testCreateIvfFlatIndexDistributively(@TempDir Path tempDir) throws E CreateIndex createIndexOp = CreateIndex.builder().withNewIndices(Collections.singletonList(index)).build(); - Transaction createIndexTx = - dataset.newTransactionBuilder().operation(createIndexOp).build(); - - try (Dataset newDataset = createIndexTx.commit()) { - assertEquals(datasetVersion + 1, newDataset.version()); - assertTrue(newDataset.listIndexes().contains(TestVectorDataset.indexName)); + try (Transaction createIndexTx = + new Transaction.Builder() + .readVersion(dataset.version()) + .operation(createIndexOp) + .build()) { + try (Dataset newDataset = new CommitBuilder(dataset).execute(createIndexTx)) { + assertEquals(datasetVersion + 1, newDataset.version()); + assertTrue(newDataset.listIndexes().contains(TestVectorDataset.indexName)); + } } } } @@ -248,12 +252,15 @@ public void testCreateIvfPqIndexDistributively(@TempDir Path tempDir) throws Exc CreateIndex createIndexOp = CreateIndex.builder().withNewIndices(Collections.singletonList(index)).build(); - Transaction createIndexTx = - dataset.newTransactionBuilder().operation(createIndexOp).build(); - - try (Dataset newDataset = createIndexTx.commit()) { - assertEquals(datasetVersion + 1, newDataset.version()); - assertTrue(newDataset.listIndexes().contains(TestVectorDataset.indexName)); + try (Transaction createIndexTx = + new Transaction.Builder() + .readVersion(dataset.version()) + .operation(createIndexOp) + .build()) { + try (Dataset newDataset = new CommitBuilder(dataset).execute(createIndexTx)) { + assertEquals(datasetVersion + 1, newDataset.version()); + assertTrue(newDataset.listIndexes().contains(TestVectorDataset.indexName)); + } } } } @@ -350,12 +357,15 @@ public void testCreateIvfSqIndexDistributively(@TempDir Path tempDir) throws Exc CreateIndex createIndexOp = CreateIndex.builder().withNewIndices(Collections.singletonList(index)).build(); - Transaction createIndexTx = - dataset.newTransactionBuilder().operation(createIndexOp).build(); - - try (Dataset newDataset = createIndexTx.commit()) { - assertEquals(datasetVersion + 1, newDataset.version()); - assertTrue(newDataset.listIndexes().contains(TestVectorDataset.indexName)); + try (Transaction createIndexTx = + new Transaction.Builder() + .readVersion(dataset.version()) + .operation(createIndexOp) + .build()) { + try (Dataset newDataset = new CommitBuilder(dataset).execute(createIndexTx)) { + assertEquals(datasetVersion + 1, newDataset.version()); + assertTrue(newDataset.listIndexes().contains(TestVectorDataset.indexName)); + } } } } diff --git a/java/src/test/java/org/lance/operation/AppendTest.java b/java/src/test/java/org/lance/operation/AppendTest.java index 2575d3afd00..5d62b429fe1 100644 --- a/java/src/test/java/org/lance/operation/AppendTest.java +++ b/java/src/test/java/org/lance/operation/AppendTest.java @@ -13,6 +13,7 @@ */ package org.lance.operation; +import org.lance.CommitBuilder; import org.lance.Dataset; import org.lance.FragmentMetadata; import org.lance.TestUtils; @@ -62,17 +63,16 @@ void testAppendMultipleFragments(@TempDir Path tempDir) { testDataset.createNewFragment(rowCount), testDataset.createNewFragment(rowCount)); - Transaction transaction = - dataset - .newTransactionBuilder() + try (Transaction txn = + new Transaction.Builder() + .readVersion(dataset.version()) .operation(Append.builder().fragments(fragments).build()) - .build(); - - try (Dataset dataset = transaction.commit()) { - assertEquals(2, dataset.version()); - assertEquals(rowCount * 3, dataset.countRows()); - assertEquals(3, dataset.getFragments().size()); - assertEquals(transaction, dataset.readTransaction().orElse(null)); + .build()) { + try (Dataset dataset = new CommitBuilder(this.dataset).execute(txn)) { + assertEquals(2, dataset.version()); + assertEquals(rowCount * 3, dataset.countRows()); + assertEquals(3, dataset.getFragments().size()); + } } } } @@ -88,12 +88,13 @@ void testAppendEmptyFragmentList(@TempDir Path tempDir) { assertThrows( IllegalArgumentException.class, () -> { - Transaction transaction = - dataset - .newTransactionBuilder() + try (Transaction txn = + new Transaction.Builder() + .readVersion(dataset.version()) .operation(Append.builder().fragments(new ArrayList<>()).build()) - .build(); - transaction.commit().close(); + .build()) { + new CommitBuilder(dataset).execute(txn).close(); + } }); } } diff --git a/java/src/test/java/org/lance/operation/DataReplacementTest.java b/java/src/test/java/org/lance/operation/DataReplacementTest.java index 6d0586cc2b1..0ba59dd8aa1 100644 --- a/java/src/test/java/org/lance/operation/DataReplacementTest.java +++ b/java/src/test/java/org/lance/operation/DataReplacementTest.java @@ -13,6 +13,7 @@ */ package org.lance.operation; +import org.lance.CommitBuilder; import org.lance.Dataset; import org.lance.Fragment; import org.lance.FragmentMetadata; @@ -71,83 +72,86 @@ void testDataReplacement(@TempDir Path tempDir) throws Exception { List fragmentMetas = Fragment.create(datasetPath, allocator, idRoot, new WriteParams.Builder().build()); - Transaction appendTxn = - dataset - .newTransactionBuilder() + try (Transaction appendTxn = + new Transaction.Builder() + .readVersion(dataset.version()) .operation(Append.builder().fragments(fragmentMetas).build()) - .build(); - - try (Dataset initDataset = appendTxn.commit()) { - assertEquals(2, initDataset.version()); - assertEquals(rowCount, initDataset.countRows()); - - // step 3. use dataset.addColumn to add a new column named as address with all null values - Field addressField = Field.nullable("address", new ArrowType.Utf8()); - Schema addressSchema = new Schema(Collections.singletonList(addressField), null); - initDataset.addColumns(addressSchema); - - try (LanceScanner scanner = initDataset.newScan()) { - try (ArrowReader resultReader = scanner.scanBatches()) { - assertTrue(resultReader.loadNextBatch()); - VectorSchemaRoot batch = resultReader.getVectorSchemaRoot(); - assertEquals(rowCount, initDataset.countRows()); - assertEquals(rowCount, batch.getRowCount()); - - // verify all null values - VarCharVector resultNameVector = (VarCharVector) batch.getVector("address"); - for (int i = 0; i < rowCount; i++) { - Assertions.assertTrue(resultNameVector.isNull(i)); + .build()) { + try (Dataset initDataset = new CommitBuilder(dataset).execute(appendTxn)) { + assertEquals(2, initDataset.version()); + assertEquals(rowCount, initDataset.countRows()); + + // step 3. use dataset.addColumn to add a new column named as address with all null + // values + Field addressField = Field.nullable("address", new ArrowType.Utf8()); + Schema addressSchema = new Schema(Collections.singletonList(addressField), null); + initDataset.addColumns(addressSchema); + + try (LanceScanner scanner = initDataset.newScan()) { + try (ArrowReader resultReader = scanner.scanBatches()) { + assertTrue(resultReader.loadNextBatch()); + VectorSchemaRoot batch = resultReader.getVectorSchemaRoot(); + assertEquals(rowCount, initDataset.countRows()); + assertEquals(rowCount, batch.getRowCount()); + + // verify all null values + VarCharVector resultNameVector = (VarCharVector) batch.getVector("address"); + for (int i = 0; i < rowCount; i++) { + Assertions.assertTrue(resultNameVector.isNull(i)); + } } } - } - // step 4. use DataReplacement transaction to replace null values - try (VectorSchemaRoot replaceVectorRoot = - VectorSchemaRoot.create(addressSchema, allocator)) { - replaceVectorRoot.allocateNew(); - VarCharVector addressVector = (VarCharVector) replaceVectorRoot.getVector("address"); + // step 4. use DataReplacement transaction to replace null values + try (VectorSchemaRoot replaceVectorRoot = + VectorSchemaRoot.create(addressSchema, allocator)) { + replaceVectorRoot.allocateNew(); + VarCharVector addressVector = (VarCharVector) replaceVectorRoot.getVector("address"); - for (int i = 0; i < rowCount; i++) { - String name = "District " + i; - addressVector.setSafe(i, name.getBytes(StandardCharsets.UTF_8)); - } - replaceVectorRoot.setRowCount(rowCount); - - DataFile datafile = - writeLanceDataFile( - dataset.allocator(), - datasetPath, - replaceVectorRoot, - new int[] {2}, - new int[] {0}); - List replacementGroups = - Collections.singletonList( - new DataReplacement.DataReplacementGroup( - fragmentMetas.get(0).getId(), datafile)); - Transaction replaceTxn = - initDataset - .newTransactionBuilder() - .operation(DataReplacement.builder().replacements(replacementGroups).build()) - .build(); - - try (Dataset datasetWithAddress = replaceTxn.commit()) { - assertEquals(4, datasetWithAddress.version()); - assertEquals(rowCount, datasetWithAddress.countRows()); - - try (LanceScanner scanner = datasetWithAddress.newScan()) { - try (ArrowReader resultReader = scanner.scanBatches()) { - assertTrue(resultReader.loadNextBatch()); - VectorSchemaRoot batch = resultReader.getVectorSchemaRoot(); + for (int i = 0; i < rowCount; i++) { + String name = "District " + i; + addressVector.setSafe(i, name.getBytes(StandardCharsets.UTF_8)); + } + replaceVectorRoot.setRowCount(rowCount); + + DataFile datafile = + writeLanceDataFile( + dataset.allocator(), + datasetPath, + replaceVectorRoot, + new int[] {2}, + new int[] {0}); + List replacementGroups = + Collections.singletonList( + new DataReplacement.DataReplacementGroup( + fragmentMetas.get(0).getId(), datafile)); + try (Transaction replaceTxn = + new Transaction.Builder() + .readVersion(initDataset.version()) + .operation(DataReplacement.builder().replacements(replacementGroups).build()) + .build()) { + try (Dataset datasetWithAddress = + new CommitBuilder(initDataset).execute(replaceTxn)) { + assertEquals(4, datasetWithAddress.version()); assertEquals(rowCount, datasetWithAddress.countRows()); - assertEquals(rowCount, batch.getRowCount()); - - // verify all address values not null - VarCharVector resultNameVector = (VarCharVector) batch.getVector("address"); - for (int i = 0; i < rowCount; i++) { - Assertions.assertFalse(resultNameVector.isNull(i)); - String expectedName = "District " + i; - String actualName = new String(resultNameVector.get(i), StandardCharsets.UTF_8); - assertEquals(expectedName, actualName); + + try (LanceScanner scanner = datasetWithAddress.newScan()) { + try (ArrowReader resultReader = scanner.scanBatches()) { + assertTrue(resultReader.loadNextBatch()); + VectorSchemaRoot batch = resultReader.getVectorSchemaRoot(); + assertEquals(rowCount, datasetWithAddress.countRows()); + assertEquals(rowCount, batch.getRowCount()); + + // verify all address values not null + VarCharVector resultNameVector = (VarCharVector) batch.getVector("address"); + for (int i = 0; i < rowCount; i++) { + Assertions.assertFalse(resultNameVector.isNull(i)); + String expectedName = "District " + i; + String actualName = + new String(resultNameVector.get(i), StandardCharsets.UTF_8); + assertEquals(expectedName, actualName); + } + } } } } diff --git a/java/src/test/java/org/lance/operation/DeleteTest.java b/java/src/test/java/org/lance/operation/DeleteTest.java index 2151b31f9de..4afa9ca976d 100644 --- a/java/src/test/java/org/lance/operation/DeleteTest.java +++ b/java/src/test/java/org/lance/operation/DeleteTest.java @@ -13,6 +13,7 @@ */ package org.lance.operation; +import org.lance.CommitBuilder; import org.lance.Dataset; import org.lance.FragmentMetadata; import org.lance.TestUtils; @@ -42,15 +43,16 @@ void testDelete(@TempDir Path tempDir) { int rowCount = 20; FragmentMetadata fragmentMeta0 = testDataset.createNewFragment(rowCount); FragmentMetadata fragmentMeta1 = testDataset.createNewFragment(rowCount); - Transaction transaction = - dataset - .newTransactionBuilder() + try (Transaction appendTxn = + new Transaction.Builder() + .readVersion(dataset.version()) .operation( Append.builder().fragments(Arrays.asList(fragmentMeta0, fragmentMeta1)).build()) - .build(); - try (Dataset dataset = transaction.commit()) { - assertEquals(2, dataset.version()); - assertEquals(2, dataset.latestVersion()); + .build()) { + try (Dataset dataset = new CommitBuilder(this.dataset).execute(appendTxn)) { + assertEquals(2, dataset.version()); + assertEquals(2, dataset.latestVersion()); + } } dataset = Dataset.open(datasetPath, allocator); @@ -60,17 +62,15 @@ void testDelete(@TempDir Path tempDir) { .map(t -> Long.valueOf(t.getId())) .collect(Collectors.toList()); - Transaction delete = - dataset - .newTransactionBuilder() + try (Transaction deleteTxn = + new Transaction.Builder() + .readVersion(dataset.version()) .operation( Delete.builder().deletedFragmentIds(deletedFragmentIds).predicate("1=1").build()) - .build(); - try (Dataset dataset = delete.commit()) { - Transaction txn = dataset.readTransaction().get(); - Delete execDelete = (Delete) txn.operation(); - assertEquals(delete.operation(), execDelete); - assertEquals(0, dataset.countRows()); + .build()) { + try (Dataset dataset = new CommitBuilder(this.dataset).execute(deleteTxn)) { + assertEquals(0, dataset.countRows()); + } } } } diff --git a/java/src/test/java/org/lance/operation/MergeTest.java b/java/src/test/java/org/lance/operation/MergeTest.java index 8268aa23071..243ea352450 100644 --- a/java/src/test/java/org/lance/operation/MergeTest.java +++ b/java/src/test/java/org/lance/operation/MergeTest.java @@ -13,6 +13,7 @@ */ package org.lance.operation; +import org.lance.CommitBuilder; import org.lance.Dataset; import org.lance.FragmentMetadata; import org.lance.TestUtils; @@ -90,36 +91,36 @@ void testMergeNewColumn(@TempDir Path tempDir) throws Exception { fragmentMeta.getDeletionFile(), fragmentMeta.getRowIdMeta()); - Transaction mergeTransaction = - initialDataset - .newTransactionBuilder() + try (Transaction mergeTxn = + new Transaction.Builder() + .readVersion(initialDataset.version()) .operation( Merge.builder() .fragments(Collections.singletonList(evolvedFragment)) .schema(evolvedSchema) .build()) - .build(); - - try (Dataset evolvedDataset = mergeTransaction.commit()) { - Assertions.assertEquals(3, evolvedDataset.version()); - Assertions.assertEquals(rowCount, evolvedDataset.countRows()); - Assertions.assertEquals(evolvedSchema, evolvedDataset.getSchema()); - Assertions.assertEquals(3, evolvedDataset.getSchema().getFields().size()); - // Verify merged data - try (LanceScanner scanner = evolvedDataset.newScan()) { - try (ArrowReader resultReader = scanner.scanBatches()) { - Assertions.assertTrue(resultReader.loadNextBatch()); - VectorSchemaRoot batch = resultReader.getVectorSchemaRoot(); - Assertions.assertEquals(rowCount, batch.getRowCount()); - Assertions.assertEquals(3, batch.getSchema().getFields().size()); - // Verify age column - IntVector ageResultVector = (IntVector) batch.getVector("age"); - for (int i = 0; i < rowCount; i++) { - Assertions.assertEquals(20 + i, ageResultVector.get(i)); - } - IntVector idResultVector = (IntVector) batch.getVector("id"); - for (int i = 0; i < rowCount; i++) { - Assertions.assertEquals(i, idResultVector.get(i)); + .build()) { + try (Dataset evolvedDataset = new CommitBuilder(initialDataset).execute(mergeTxn)) { + Assertions.assertEquals(3, evolvedDataset.version()); + Assertions.assertEquals(rowCount, evolvedDataset.countRows()); + Assertions.assertEquals(evolvedSchema, evolvedDataset.getSchema()); + Assertions.assertEquals(3, evolvedDataset.getSchema().getFields().size()); + // Verify merged data + try (LanceScanner scanner = evolvedDataset.newScan()) { + try (ArrowReader resultReader = scanner.scanBatches()) { + Assertions.assertTrue(resultReader.loadNextBatch()); + VectorSchemaRoot batch = resultReader.getVectorSchemaRoot(); + Assertions.assertEquals(rowCount, batch.getRowCount()); + Assertions.assertEquals(3, batch.getSchema().getFields().size()); + // Verify age column + IntVector ageResultVector = (IntVector) batch.getVector("age"); + for (int i = 0; i < rowCount; i++) { + Assertions.assertEquals(20 + i, ageResultVector.get(i)); + } + IntVector idResultVector = (IntVector) batch.getVector("id"); + for (int i = 0; i < rowCount; i++) { + Assertions.assertEquals(i, idResultVector.get(i)); + } } } } @@ -169,36 +170,36 @@ void testReplaceAsDiffColumns(@TempDir Path tempDir) throws Exception { fragmentMeta.getDeletionFile(), fragmentMeta.getRowIdMeta()); - Transaction mergeTransaction = - initialDataset - .newTransactionBuilder() + try (Transaction mergeTxn = + new Transaction.Builder() + .readVersion(initialDataset.version()) .operation( Merge.builder() .fragments(Collections.singletonList(evolvedFragment)) .schema(evolvedSchema) .build()) - .build(); - - try (Dataset evolvedDataset = mergeTransaction.commit()) { - Assertions.assertEquals(3, evolvedDataset.version()); - Assertions.assertEquals(rowCount, evolvedDataset.countRows()); - Assertions.assertEquals(evolvedSchema, evolvedDataset.getSchema()); - Assertions.assertEquals(2, evolvedDataset.getSchema().getFields().size()); - // Verify merged data - try (LanceScanner scanner = evolvedDataset.newScan()) { - try (ArrowReader resultReader = scanner.scanBatches()) { - Assertions.assertTrue(resultReader.loadNextBatch()); - VectorSchemaRoot batch = resultReader.getVectorSchemaRoot(); - Assertions.assertEquals(rowCount, batch.getRowCount()); - Assertions.assertEquals(2, batch.getSchema().getFields().size()); - // Verify age column - IntVector ageResultVector = (IntVector) batch.getVector("age"); - for (int i = 0; i < rowCount; i++) { - Assertions.assertEquals(20 + i, ageResultVector.get(i)); - } - IntVector idResultVector = (IntVector) batch.getVector("id"); - for (int i = 0; i < rowCount; i++) { - Assertions.assertEquals(i, idResultVector.get(i)); + .build()) { + try (Dataset evolvedDataset = new CommitBuilder(initialDataset).execute(mergeTxn)) { + Assertions.assertEquals(3, evolvedDataset.version()); + Assertions.assertEquals(rowCount, evolvedDataset.countRows()); + Assertions.assertEquals(evolvedSchema, evolvedDataset.getSchema()); + Assertions.assertEquals(2, evolvedDataset.getSchema().getFields().size()); + // Verify merged data + try (LanceScanner scanner = evolvedDataset.newScan()) { + try (ArrowReader resultReader = scanner.scanBatches()) { + Assertions.assertTrue(resultReader.loadNextBatch()); + VectorSchemaRoot batch = resultReader.getVectorSchemaRoot(); + Assertions.assertEquals(rowCount, batch.getRowCount()); + Assertions.assertEquals(2, batch.getSchema().getFields().size()); + // Verify age column + IntVector ageResultVector = (IntVector) batch.getVector("age"); + for (int i = 0; i < rowCount; i++) { + Assertions.assertEquals(20 + i, ageResultVector.get(i)); + } + IntVector idResultVector = (IntVector) batch.getVector("id"); + for (int i = 0; i < rowCount; i++) { + Assertions.assertEquals(i, idResultVector.get(i)); + } } } } @@ -254,31 +255,31 @@ void testMergeExistingColumn(@TempDir Path tempDir) throws Exception { fragmentMeta.getDeletionFile(), fragmentMeta.getRowIdMeta()); - Transaction mergeTransaction = - initialDataset - .newTransactionBuilder() + try (Transaction mergeTxn = + new Transaction.Builder() + .readVersion(initialDataset.version()) .operation( Merge.builder() .fragments(Collections.singletonList(evolvedFragment)) .schema(testDataset.getSchema()) .build()) - .build(); - - try (Dataset mergedDataset = mergeTransaction.commit()) { - Assertions.assertEquals(3, mergedDataset.version()); - Assertions.assertEquals(rowCount, mergedDataset.countRows()); + .build()) { + try (Dataset mergedDataset = new CommitBuilder(initialDataset).execute(mergeTxn)) { + Assertions.assertEquals(3, mergedDataset.version()); + Assertions.assertEquals(rowCount, mergedDataset.countRows()); - // Verify updated data - try (LanceScanner scanner = mergedDataset.newScan()) { - try (ArrowReader resultReader = scanner.scanBatches()) { - Assertions.assertTrue(resultReader.loadNextBatch()); - VectorSchemaRoot batch = resultReader.getVectorSchemaRoot(); + // Verify updated data + try (LanceScanner scanner = mergedDataset.newScan()) { + try (ArrowReader resultReader = scanner.scanBatches()) { + Assertions.assertTrue(resultReader.loadNextBatch()); + VectorSchemaRoot batch = resultReader.getVectorSchemaRoot(); - VarCharVector nameResultVector = (VarCharVector) batch.getVector("name"); - for (int i = 0; i < rowCount; i++) { - String expectedName = "UpdatedName_" + i; - String actualName = new String(nameResultVector.get(i), StandardCharsets.UTF_8); - Assertions.assertEquals(expectedName, actualName); + VarCharVector nameResultVector = (VarCharVector) batch.getVector("name"); + for (int i = 0; i < rowCount; i++) { + String expectedName = "UpdatedName_" + i; + String actualName = new String(nameResultVector.get(i), StandardCharsets.UTF_8); + Assertions.assertEquals(expectedName, actualName); + } } } } diff --git a/java/src/test/java/org/lance/operation/OperationTestBase.java b/java/src/test/java/org/lance/operation/OperationTestBase.java index f89aeb6e829..5f2c2a46d99 100644 --- a/java/src/test/java/org/lance/operation/OperationTestBase.java +++ b/java/src/test/java/org/lance/operation/OperationTestBase.java @@ -13,6 +13,7 @@ */ package org.lance.operation; +import org.lance.CommitBuilder; import org.lance.Dataset; import org.lance.FragmentMetadata; import org.lance.TestUtils; @@ -53,12 +54,13 @@ protected Dataset createAndAppendRows(TestUtils.SimpleTestDataset suite, int row dataset = suite.createEmptyDataset(); FragmentMetadata fragmentMeta = suite.createNewFragment(rowCount); - Transaction appendTxn = - dataset - .newTransactionBuilder() + try (Transaction txn = + new Transaction.Builder() + .readVersion(dataset.version()) .operation(Append.builder().fragments(Collections.singletonList(fragmentMeta)).build()) - .build(); - return appendTxn.commit(); + .build()) { + return new CommitBuilder(dataset).execute(txn); + } } /** diff --git a/java/src/test/java/org/lance/operation/OverwriteTest.java b/java/src/test/java/org/lance/operation/OverwriteTest.java index 5ecda106fe4..3af691c5009 100644 --- a/java/src/test/java/org/lance/operation/OverwriteTest.java +++ b/java/src/test/java/org/lance/operation/OverwriteTest.java @@ -13,6 +13,7 @@ */ package org.lance.operation; +import org.lance.CommitBuilder; import org.lance.Dataset; import org.lance.Fragment; import org.lance.FragmentMetadata; @@ -45,33 +46,34 @@ void testOverwrite(@TempDir Path tempDir) throws Exception { // Commit fragment int rowCount = 20; FragmentMetadata fragmentMeta = testDataset.createNewFragment(rowCount); - Transaction transaction = - dataset - .newTransactionBuilder() + try (Transaction txn = + new Transaction.Builder() + .readVersion(dataset.version()) .operation( Overwrite.builder() .fragments(Collections.singletonList(fragmentMeta)) .schema(testDataset.getSchema()) .build()) - .build(); - try (Dataset dataset = transaction.commit()) { - assertEquals(2, dataset.version()); - assertEquals(2, dataset.latestVersion()); - assertEquals(rowCount, dataset.countRows()); - Fragment fragment = dataset.getFragments().get(0); + .build()) { + try (Dataset dataset = new CommitBuilder(this.dataset).execute(txn)) { + assertEquals(2, dataset.version()); + assertEquals(2, dataset.latestVersion()); + assertEquals(rowCount, dataset.countRows()); + Fragment fragment = dataset.getFragments().get(0); - try (LanceScanner scanner = fragment.newScan()) { - Schema schemaRes = scanner.schema(); - assertEquals(testDataset.getSchema(), schemaRes); + try (LanceScanner scanner = fragment.newScan()) { + Schema schemaRes = scanner.schema(); + assertEquals(testDataset.getSchema(), schemaRes); + } } } // Try to commit from stale version (v1) - should fail with retryable error rowCount = 40; fragmentMeta = testDataset.createNewFragment(rowCount); - Transaction staleTxn = - dataset - .newTransactionBuilder() + try (Transaction staleTxn = + new Transaction.Builder() + .readVersion(dataset.version()) .operation( Overwrite.builder() .fragments(Collections.singletonList(fragmentMeta)) @@ -79,19 +81,22 @@ void testOverwrite(@TempDir Path tempDir) throws Exception { .configUpsertValues(Collections.singletonMap("config_key", "config_value")) .build()) .transactionProperties(Collections.singletonMap("key", "value")) - .build(); - assertEquals("value", staleTxn.transactionProperties().map(m -> m.get("key")).orElse(null)); + .build()) { + assertEquals("value", staleTxn.transactionProperties().map(m -> m.get("key")).orElse(null)); - RuntimeException ex = assertThrows(RuntimeException.class, () -> staleTxn.commit().close()); - assertTrue( - ex.getMessage().contains("Retryable commit conflict"), - "Expected retryable commit conflict error, got: " + ex.getMessage()); + RuntimeException ex = + assertThrows( + RuntimeException.class, () -> new CommitBuilder(dataset).execute(staleTxn).close()); + assertTrue( + ex.getMessage().contains("Retryable commit conflict"), + "Expected retryable commit conflict error, got: " + ex.getMessage()); + } // Checkout latest and retry - should succeed dataset.checkoutLatest(); - Transaction retryTxn = - dataset - .newTransactionBuilder() + try (Transaction retryTxn = + new Transaction.Builder() + .readVersion(dataset.version()) .operation( Overwrite.builder() .fragments(Collections.singletonList(fragmentMeta)) @@ -99,19 +104,20 @@ void testOverwrite(@TempDir Path tempDir) throws Exception { .configUpsertValues(Collections.singletonMap("config_key", "config_value")) .build()) .transactionProperties(Collections.singletonMap("key", "value")) - .build(); - try (Dataset dataset = retryTxn.commit()) { - assertEquals(3, dataset.version()); - assertEquals(3, dataset.latestVersion()); - assertEquals(rowCount, dataset.countRows()); - assertEquals("config_value", dataset.getConfig().get("config_key")); - Fragment fragment = dataset.getFragments().get(0); + .build()) { + try (Dataset dataset = new CommitBuilder(this.dataset).execute(retryTxn)) { + assertEquals(3, dataset.version()); + assertEquals(3, dataset.latestVersion()); + assertEquals(rowCount, dataset.countRows()); + assertEquals("config_value", dataset.getConfig().get("config_key")); + Fragment fragment = dataset.getFragments().get(0); - try (LanceScanner scanner = fragment.newScan()) { - Schema schemaRes = scanner.schema(); - assertEquals(testDataset.getSchema(), schemaRes); + try (LanceScanner scanner = fragment.newScan()) { + Schema schemaRes = scanner.schema(); + assertEquals(testDataset.getSchema(), schemaRes); + } + assertEquals(retryTxn, dataset.readTransaction().orElse(null)); } - assertEquals(retryTxn, dataset.readTransaction().orElse(null)); } } } diff --git a/java/src/test/java/org/lance/operation/ProjectTest.java b/java/src/test/java/org/lance/operation/ProjectTest.java index a8124a672ca..fa0c92cc15f 100644 --- a/java/src/test/java/org/lance/operation/ProjectTest.java +++ b/java/src/test/java/org/lance/operation/ProjectTest.java @@ -13,6 +13,7 @@ */ package org.lance.operation; +import org.lance.CommitBuilder; import org.lance.Dataset; import org.lance.TestUtils; import org.lance.Transaction; @@ -43,29 +44,27 @@ void testProjection(@TempDir Path tempDir) { assertEquals(testDataset.getSchema(), dataset.getSchema()); List fieldList = new ArrayList<>(testDataset.getSchema().getFields()); Collections.reverse(fieldList); - Transaction txn1 = - dataset - .newTransactionBuilder() + try (Transaction txn1 = + new Transaction.Builder() + .readVersion(dataset.version()) .operation(Project.builder().schema(new Schema(fieldList)).build()) - .build(); - try (Dataset committedDataset = txn1.commit()) { - assertEquals(1, txn1.readVersion()); - assertEquals(1, dataset.version()); - assertEquals(2, committedDataset.version()); - assertEquals(new Schema(fieldList), committedDataset.getSchema()); - fieldList.remove(1); - Transaction txn2 = - committedDataset - .newTransactionBuilder() - .operation(Project.builder().schema(new Schema(fieldList)).build()) - .build(); - try (Dataset committedDataset2 = txn2.commit()) { - assertEquals(2, txn2.readVersion()); + .build()) { + try (Dataset committedDataset = new CommitBuilder(dataset).execute(txn1)) { + assertEquals(1, dataset.version()); assertEquals(2, committedDataset.version()); - assertEquals(3, committedDataset2.version()); - assertEquals(new Schema(fieldList), committedDataset2.getSchema()); - assertEquals(txn1, committedDataset.readTransaction().orElse(null)); - assertEquals(txn2, committedDataset2.readTransaction().orElse(null)); + assertEquals(new Schema(fieldList), committedDataset.getSchema()); + fieldList.remove(1); + try (Transaction txn2 = + new Transaction.Builder() + .readVersion(committedDataset.version()) + .operation(Project.builder().schema(new Schema(fieldList)).build()) + .build()) { + try (Dataset committedDataset2 = new CommitBuilder(committedDataset).execute(txn2)) { + assertEquals(2, committedDataset.version()); + assertEquals(3, committedDataset2.version()); + assertEquals(new Schema(fieldList), committedDataset2.getSchema()); + } + } } } } diff --git a/java/src/test/java/org/lance/operation/ReserveFragmentsTest.java b/java/src/test/java/org/lance/operation/ReserveFragmentsTest.java index 14089482c08..9299fa7b1bd 100644 --- a/java/src/test/java/org/lance/operation/ReserveFragmentsTest.java +++ b/java/src/test/java/org/lance/operation/ReserveFragmentsTest.java @@ -13,6 +13,7 @@ */ package org.lance.operation; +import org.lance.CommitBuilder; import org.lance.Dataset; import org.lance.Fragment; import org.lance.FragmentMetadata; @@ -42,54 +43,55 @@ void testReserveFragments(@TempDir Path tempDir) throws Exception { // Create an initial fragment to establish a baseline fragment ID FragmentMetadata initialFragmentMeta = testDataset.createNewFragment(10); - Transaction appendTransaction = - dataset - .newTransactionBuilder() + try (Transaction appendTxn = + new Transaction.Builder() + .readVersion(dataset.version()) .operation( Append.builder() .fragments(Collections.singletonList(initialFragmentMeta)) .build()) - .build(); - try (Dataset datasetWithFragment = appendTransaction.commit()) { - // Reserve fragment IDs - int numFragmentsToReserve = 5; - Transaction reserveTransaction = - datasetWithFragment - .newTransactionBuilder() - .operation( - new ReserveFragments.Builder().numFragments(numFragmentsToReserve).build()) - .build(); - try (Dataset datasetWithReservedFragments = reserveTransaction.commit()) { - // Create a new fragment and verify its ID reflects the reservation - FragmentMetadata newFragmentMeta = testDataset.createNewFragment(10); - Transaction appendTransaction2 = - datasetWithReservedFragments - .newTransactionBuilder() + .build()) { + try (Dataset datasetWithFragment = new CommitBuilder(dataset).execute(appendTxn)) { + // Reserve fragment IDs + int numFragmentsToReserve = 5; + try (Transaction reserveTxn = + new Transaction.Builder() + .readVersion(datasetWithFragment.version()) .operation( - Append.builder() - .fragments(Collections.singletonList(newFragmentMeta)) - .build()) - .build(); - try (Dataset finalDataset = appendTransaction2.commit()) { - // Verify the fragment IDs were properly reserved - // The new fragment should have an ID that's at least numFragmentsToReserve higher - // than it would have been without the reservation - List fragments = finalDataset.getFragments(); - assertEquals(2, fragments.size()); + new ReserveFragments.Builder().numFragments(numFragmentsToReserve).build()) + .build()) { + try (Dataset datasetWithReservedFragments = + new CommitBuilder(datasetWithFragment).execute(reserveTxn)) { + // Create a new fragment and verify its ID reflects the reservation + FragmentMetadata newFragmentMeta = testDataset.createNewFragment(10); + try (Transaction appendTxn2 = + new Transaction.Builder() + .readVersion(datasetWithReservedFragments.version()) + .operation( + Append.builder() + .fragments(Collections.singletonList(newFragmentMeta)) + .build()) + .build()) { + try (Dataset finalDataset = + new CommitBuilder(datasetWithReservedFragments).execute(appendTxn2)) { + // Verify the fragment IDs were properly reserved + // The new fragment should have an ID that's at least numFragmentsToReserve + // higher than it would have been without the reservation + List fragments = finalDataset.getFragments(); + assertEquals(2, fragments.size()); - // The first fragment ID is typically 0, and the second would normally be 1 - // But after reserving 5 fragments, the second fragment ID should be at least 6 - Fragment firstFragment = fragments.get(0); - Fragment secondFragment = fragments.get(1); + // The first fragment ID is typically 0, and the second would normally be 1 + // But after reserving 5 fragments, the second fragment ID should be at least 6 + Fragment firstFragment = fragments.get(0); + Fragment secondFragment = fragments.get(1); - // Check that the second fragment has a significantly higher ID than the first - // This is an indirect way to verify that fragment IDs were reserved - Assertions.assertNotEquals( - firstFragment.metadata().getId() + 1, secondFragment.getId()); - - // Verify the transaction is recorded - assertEquals( - reserveTransaction, datasetWithReservedFragments.readTransaction().orElse(null)); + // Check that the second fragment has a significantly higher ID than the first + // This is an indirect way to verify that fragment IDs were reserved + Assertions.assertNotEquals( + firstFragment.metadata().getId() + 1, secondFragment.getId()); + } + } + } } } } diff --git a/java/src/test/java/org/lance/operation/RestoreTest.java b/java/src/test/java/org/lance/operation/RestoreTest.java index 751263f4c06..98e9d779ade 100644 --- a/java/src/test/java/org/lance/operation/RestoreTest.java +++ b/java/src/test/java/org/lance/operation/RestoreTest.java @@ -13,6 +13,7 @@ */ package org.lance.operation; +import org.lance.CommitBuilder; import org.lance.Dataset; import org.lance.FragmentMetadata; import org.lance.TestUtils; @@ -43,30 +44,31 @@ void testRestore(@TempDir Path tempDir) { // Append data to create a new version int rowCount = 20; FragmentMetadata fragmentMeta = testDataset.createNewFragment(rowCount); - Transaction transaction = - dataset - .newTransactionBuilder() + try (Transaction appendTxn = + new Transaction.Builder() + .readVersion(dataset.version()) .operation( Append.builder().fragments(Collections.singletonList(fragmentMeta)).build()) - .build(); - try (Dataset modifiedDataset = transaction.commit()) { - // Verify the dataset was modified - long newVersion = modifiedDataset.version(); - assertEquals(initialVersion + 1, newVersion); - assertEquals(rowCount, modifiedDataset.countRows()); + .build()) { + try (Dataset modifiedDataset = new CommitBuilder(dataset).execute(appendTxn)) { + // Verify the dataset was modified + long newVersion = modifiedDataset.version(); + assertEquals(initialVersion + 1, newVersion); + assertEquals(rowCount, modifiedDataset.countRows()); - // Restore to the initial version - Transaction restoreTransaction = - modifiedDataset - .newTransactionBuilder() - .operation(new Restore.Builder().version(initialVersion).build()) - .build(); - try (Dataset restoredDataset = restoreTransaction.commit()) { - // Verify the dataset was restored to the initial version, but the version increases - assertEquals(initialVersion + 2, restoredDataset.version()); - // Initial dataset had 0 rows - assertEquals(0, restoredDataset.countRows()); - assertEquals(restoreTransaction, restoredDataset.readTransaction().orElse(null)); + // Restore to the initial version + try (Transaction restoreTxn = + new Transaction.Builder() + .readVersion(modifiedDataset.version()) + .operation(new Restore.Builder().version(initialVersion).build()) + .build()) { + try (Dataset restoredDataset = new CommitBuilder(modifiedDataset).execute(restoreTxn)) { + // Verify the dataset was restored to the initial version, but the version increases + assertEquals(initialVersion + 2, restoredDataset.version()); + // Initial dataset had 0 rows + assertEquals(0, restoredDataset.countRows()); + } + } } } } diff --git a/java/src/test/java/org/lance/operation/RewriteTest.java b/java/src/test/java/org/lance/operation/RewriteTest.java index 4a73e2807e8..f2081ab8895 100644 --- a/java/src/test/java/org/lance/operation/RewriteTest.java +++ b/java/src/test/java/org/lance/operation/RewriteTest.java @@ -13,6 +13,7 @@ */ package org.lance.operation; +import org.lance.CommitBuilder; import org.lance.Dataset; import org.lance.FragmentMetadata; import org.lance.TestUtils; @@ -44,47 +45,45 @@ void testRewrite(@TempDir Path tempDir) { FragmentMetadata fragmentMeta1 = testDataset.createNewFragment(rowCount); FragmentMetadata fragmentMeta2 = testDataset.createNewFragment(rowCount); - Transaction appendTx = - dataset - .newTransactionBuilder() + try (Transaction appendTxn = + new Transaction.Builder() + .readVersion(dataset.version()) .operation( Append.builder().fragments(Arrays.asList(fragmentMeta1, fragmentMeta2)).build()) - .build(); - - try (Dataset datasetWithData = appendTx.commit()) { - assertEquals(2, datasetWithData.version()); - assertEquals(rowCount * 2, datasetWithData.countRows()); - - // Now create a rewrite operation - List groups = new ArrayList<>(); - - // Create a rewrite group with old fragments and new fragments - List oldFragments = new ArrayList<>(); - oldFragments.add(fragmentMeta1); - - List newFragments = new ArrayList<>(); - FragmentMetadata newFragmentMeta = testDataset.createNewFragment(rowCount); - newFragments.add(newFragmentMeta); - - RewriteGroup group = - RewriteGroup.builder().oldFragments(oldFragments).newFragments(newFragments).build(); - - groups.add(group); - - // Create and commit the rewrite transaction - Transaction rewriteTx = - datasetWithData - .newTransactionBuilder() - .operation(Rewrite.builder().groups(groups).build()) - .build(); - - try (Dataset rewrittenDataset = rewriteTx.commit()) { - assertEquals(3, rewrittenDataset.version()); - // The row count should remain the same since we're just rewriting - assertEquals(rowCount * 2, rewrittenDataset.countRows()); - - // Verify that the transaction was recorded - assertEquals(rewriteTx, rewrittenDataset.readTransaction().orElse(null)); + .build()) { + try (Dataset datasetWithData = new CommitBuilder(dataset).execute(appendTxn)) { + assertEquals(2, datasetWithData.version()); + assertEquals(rowCount * 2, datasetWithData.countRows()); + + // Now create a rewrite operation + List groups = new ArrayList<>(); + + // Create a rewrite group with old fragments and new fragments + List oldFragments = new ArrayList<>(); + oldFragments.add(fragmentMeta1); + + List newFragments = new ArrayList<>(); + FragmentMetadata newFragmentMeta = testDataset.createNewFragment(rowCount); + newFragments.add(newFragmentMeta); + + RewriteGroup group = + RewriteGroup.builder().oldFragments(oldFragments).newFragments(newFragments).build(); + + groups.add(group); + + // Create and commit the rewrite transaction + try (Transaction rewriteTxn = + new Transaction.Builder() + .readVersion(datasetWithData.version()) + .operation(Rewrite.builder().groups(groups).build()) + .build()) { + try (Dataset rewrittenDataset = + new CommitBuilder(datasetWithData).execute(rewriteTxn)) { + assertEquals(3, rewrittenDataset.version()); + // The row count should remain the same since we're just rewriting + assertEquals(rowCount * 2, rewrittenDataset.countRows()); + } + } } } } diff --git a/java/src/test/java/org/lance/operation/TruncateTest.java b/java/src/test/java/org/lance/operation/TruncateTest.java index 93f5b689e8c..e316c969362 100644 --- a/java/src/test/java/org/lance/operation/TruncateTest.java +++ b/java/src/test/java/org/lance/operation/TruncateTest.java @@ -13,6 +13,7 @@ */ package org.lance.operation; +import org.lance.CommitBuilder; import org.lance.Dataset; import org.lance.FragmentMetadata; import org.lance.TestUtils; @@ -40,15 +41,15 @@ void testTruncateTable(@TempDir Path tempDir) throws Exception { // Append some data int rowCount = 20; FragmentMetadata fragmentMeta = testDataset.createNewFragment(rowCount); - Transaction transaction = - dataset - .newTransactionBuilder() - .operation( - Append.builder() - .fragments(java.util.Collections.singletonList(fragmentMeta)) - .build()) - .build(); - try (Dataset ds1 = transaction.commit()) { + try (Transaction txn = + new Transaction.Builder() + .readVersion(dataset.version()) + .operation( + Append.builder() + .fragments(java.util.Collections.singletonList(fragmentMeta)) + .build()) + .build(); + Dataset ds1 = new CommitBuilder(dataset).execute(txn)) { assertEquals(rowCount, ds1.countRows()); // Truncate to empty while preserving schema diff --git a/java/src/test/java/org/lance/operation/UpdateConfigTest.java b/java/src/test/java/org/lance/operation/UpdateConfigTest.java index 14c3ea444f0..aba003846c8 100644 --- a/java/src/test/java/org/lance/operation/UpdateConfigTest.java +++ b/java/src/test/java/org/lance/operation/UpdateConfigTest.java @@ -13,6 +13,7 @@ */ package org.lance.operation; +import org.lance.CommitBuilder; import org.lance.Dataset; import org.lance.TestUtils; import org.lance.Transaction; @@ -45,90 +46,101 @@ void testUpdateConfig(@TempDir Path tempDir) { UpdateMap configUpdates = UpdateMap.builder().updates(configValues).replace(false).build(); - Transaction transaction = - dataset - .newTransactionBuilder() + try (Transaction txn = + new Transaction.Builder() + .readVersion(dataset.version()) .operation(UpdateConfig.builder().configUpdates(configUpdates).build()) - .build(); - try (Dataset updatedDataset = transaction.commit()) { - assertEquals(2, updatedDataset.version()); - assertEquals("value1", updatedDataset.getConfig().get("key1")); - assertEquals("value2", updatedDataset.getConfig().get("key2")); - - // Test 2: Delete configuration key using configUpdates with null value - Map deleteUpdates = new HashMap<>(); - deleteUpdates.put("key1", null); // null value means delete - - UpdateMap configDeleteUpdates = - UpdateMap.builder().updates(deleteUpdates).replace(false).build(); - - transaction = - updatedDataset - .newTransactionBuilder() - .operation(UpdateConfig.builder().configUpdates(configDeleteUpdates).build()) - .build(); - try (Dataset updatedDataset2 = transaction.commit()) { - assertEquals(3, updatedDataset2.version()); - assertNull(updatedDataset2.getConfig().get("key1")); - assertEquals("value2", updatedDataset2.getConfig().get("key2")); - - // Test 3: Update schema metadata using schemaMetadataUpdates - Map schemaMetadataMap = new HashMap<>(); - schemaMetadataMap.put("schema_key1", "schema_value1"); - schemaMetadataMap.put("schema_key2", "schema_value2"); - - UpdateMap schemaMetadataUpdates = - UpdateMap.builder().updates(schemaMetadataMap).replace(false).build(); - - transaction = - updatedDataset2 - .newTransactionBuilder() - .operation( - UpdateConfig.builder().schemaMetadataUpdates(schemaMetadataUpdates).build()) - .build(); - try (Dataset updatedDataset3 = transaction.commit()) { - assertEquals(4, updatedDataset3.version()); - assertEquals( - "schema_value1", updatedDataset3.getLanceSchema().metadata().get("schema_key1")); - assertEquals( - "schema_value2", updatedDataset3.getLanceSchema().metadata().get("schema_key2")); - - // Test 4: Update field metadata using fieldMetadataUpdates - Map fieldMetadataUpdates = new HashMap<>(); - - Map field0Updates = new HashMap<>(); - field0Updates.put("field0_key1", "field0_value1"); - UpdateMap field0UpdateMap = - UpdateMap.builder().updates(field0Updates).replace(false).build(); - - Map field1Updates = new HashMap<>(); - field1Updates.put("field1_key1", "field1_value1"); - field1Updates.put("field1_key2", "field1_value2"); - UpdateMap field1UpdateMap = - UpdateMap.builder().updates(field1Updates).replace(false).build(); - - fieldMetadataUpdates.put(0, field0UpdateMap); - fieldMetadataUpdates.put(1, field1UpdateMap); - - transaction = - updatedDataset3 - .newTransactionBuilder() - .operation( - UpdateConfig.builder().fieldMetadataUpdates(fieldMetadataUpdates).build()) - .build(); - try (Dataset updatedDataset4 = transaction.commit()) { - assertEquals(5, updatedDataset4.version()); - - // Verify field metadata for field 0 - Map fieldMetadata0 = - updatedDataset4.getLanceSchema().fields().get(0).getMetadata(); - assertEquals("field0_value1", fieldMetadata0.get("field0_key1")); - - // Verify field metadata for field 1 - Map field1Result = - updatedDataset4.getLanceSchema().fields().get(1).getMetadata(); - assertEquals("field1_value1", field1Result.get("field1_key1")); - assertEquals("field1_value2", field1Result.get("field1_key2")); + .build()) { + try (Dataset updatedDataset = new CommitBuilder(dataset).execute(txn)) { + assertEquals(2, updatedDataset.version()); + assertEquals("value1", updatedDataset.getConfig().get("key1")); + assertEquals("value2", updatedDataset.getConfig().get("key2")); + + // Test 2: Delete configuration key using configUpdates with null value + Map deleteUpdates = new HashMap<>(); + deleteUpdates.put("key1", null); // null value means delete + + UpdateMap configDeleteUpdates = + UpdateMap.builder().updates(deleteUpdates).replace(false).build(); + + try (Transaction txn2 = + new Transaction.Builder() + .readVersion(updatedDataset.version()) + .operation(UpdateConfig.builder().configUpdates(configDeleteUpdates).build()) + .build()) { + try (Dataset updatedDataset2 = new CommitBuilder(updatedDataset).execute(txn2)) { + assertEquals(3, updatedDataset2.version()); + assertNull(updatedDataset2.getConfig().get("key1")); + assertEquals("value2", updatedDataset2.getConfig().get("key2")); + + // Test 3: Update schema metadata using schemaMetadataUpdates + Map schemaMetadataMap = new HashMap<>(); + schemaMetadataMap.put("schema_key1", "schema_value1"); + schemaMetadataMap.put("schema_key2", "schema_value2"); + + UpdateMap schemaMetadataUpdates = + UpdateMap.builder().updates(schemaMetadataMap).replace(false).build(); + + try (Transaction txn3 = + new Transaction.Builder() + .readVersion(updatedDataset2.version()) + .operation( + UpdateConfig.builder() + .schemaMetadataUpdates(schemaMetadataUpdates) + .build()) + .build()) { + try (Dataset updatedDataset3 = new CommitBuilder(updatedDataset2).execute(txn3)) { + assertEquals(4, updatedDataset3.version()); + assertEquals( + "schema_value1", + updatedDataset3.getLanceSchema().metadata().get("schema_key1")); + assertEquals( + "schema_value2", + updatedDataset3.getLanceSchema().metadata().get("schema_key2")); + + // Test 4: Update field metadata using fieldMetadataUpdates + Map fieldMetadataUpdates = new HashMap<>(); + + Map field0Updates = new HashMap<>(); + field0Updates.put("field0_key1", "field0_value1"); + UpdateMap field0UpdateMap = + UpdateMap.builder().updates(field0Updates).replace(false).build(); + + Map field1Updates = new HashMap<>(); + field1Updates.put("field1_key1", "field1_value1"); + field1Updates.put("field1_key2", "field1_value2"); + UpdateMap field1UpdateMap = + UpdateMap.builder().updates(field1Updates).replace(false).build(); + + fieldMetadataUpdates.put(0, field0UpdateMap); + fieldMetadataUpdates.put(1, field1UpdateMap); + + try (Transaction txn4 = + new Transaction.Builder() + .readVersion(updatedDataset3.version()) + .operation( + UpdateConfig.builder() + .fieldMetadataUpdates(fieldMetadataUpdates) + .build()) + .build()) { + try (Dataset updatedDataset4 = + new CommitBuilder(updatedDataset3).execute(txn4)) { + assertEquals(5, updatedDataset4.version()); + + // Verify field metadata for field 0 + Map fieldMetadata0 = + updatedDataset4.getLanceSchema().fields().get(0).getMetadata(); + assertEquals("field0_value1", fieldMetadata0.get("field0_key1")); + + // Verify field metadata for field 1 + Map field1Result = + updatedDataset4.getLanceSchema().fields().get(1).getMetadata(); + assertEquals("field1_value1", field1Result.get("field1_key1")); + assertEquals("field1_value2", field1Result.get("field1_key2")); + } + } + } + } } } } diff --git a/java/src/test/java/org/lance/operation/UpdateTest.java b/java/src/test/java/org/lance/operation/UpdateTest.java index 0e3e2276204..bb39a5f4d12 100644 --- a/java/src/test/java/org/lance/operation/UpdateTest.java +++ b/java/src/test/java/org/lance/operation/UpdateTest.java @@ -13,6 +13,7 @@ */ package org.lance.operation; +import org.lance.CommitBuilder; import org.lance.Dataset; import org.lance.Fragment; import org.lance.FragmentMetadata; @@ -54,35 +55,37 @@ void testUpdate(@TempDir Path tempDir) throws Exception { // Commit fragment int rowCount = 20; FragmentMetadata fragmentMeta = testDataset.createNewFragment(rowCount); - Transaction transaction = - dataset - .newTransactionBuilder() + try (Transaction appendTxn = + new Transaction.Builder() + .readVersion(dataset.version()) .operation( Append.builder().fragments(Collections.singletonList(fragmentMeta)).build()) - .build(); - - try (Dataset dataset = transaction.commit()) { - assertEquals(2, dataset.version()); - assertEquals(2, dataset.latestVersion()); - assertEquals(rowCount, dataset.countRows()); - assertThrows( - IllegalArgumentException.class, - () -> - dataset - .newTransactionBuilder() - .operation(Append.builder().fragments(new ArrayList<>()).build()) - .build() - .commit() - .close()); + .build()) { + try (Dataset dataset = new CommitBuilder(this.dataset).execute(appendTxn)) { + assertEquals(2, dataset.version()); + assertEquals(2, dataset.latestVersion()); + assertEquals(rowCount, dataset.countRows()); + assertThrows( + IllegalArgumentException.class, + () -> { + try (Transaction txn = + new Transaction.Builder() + .readVersion(dataset.version()) + .operation(Append.builder().fragments(new ArrayList<>()).build()) + .build()) { + new CommitBuilder(dataset).execute(txn).close(); + } + }); + } } dataset = Dataset.open(datasetPath, allocator); // Update fragments rowCount = 40; FragmentMetadata newFragment = testDataset.createNewFragment(rowCount); - transaction = - dataset - .newTransactionBuilder() + try (Transaction updateTxn = + new Transaction.Builder() + .readVersion(dataset.version()) .operation( Update.builder() .removedFragmentIds( @@ -91,15 +94,12 @@ void testUpdate(@TempDir Path tempDir) throws Exception { .newFragments(Collections.singletonList(newFragment)) .updateMode(Optional.of(UpdateMode.RewriteRows)) .build()) - .build(); - - try (Dataset dataset = transaction.commit()) { - assertEquals(3, dataset.version()); - assertEquals(3, dataset.latestVersion()); - assertEquals(rowCount, dataset.countRows()); - - Transaction txn = dataset.readTransaction().orElse(null); - assertEquals(transaction, txn); + .build()) { + try (Dataset dataset = new CommitBuilder(this.dataset).execute(updateTxn)) { + assertEquals(3, dataset.version()); + assertEquals(3, dataset.latestVersion()); + assertEquals(rowCount, dataset.countRows()); + } } } } @@ -122,16 +122,17 @@ void testUpdateColumns(@TempDir Path tempDir) throws Exception { */ int rowCount = 6; FragmentMetadata fragmentMeta = testDataset.createNewFragment(rowCount); - Transaction appendTransaction = - dataset - .newTransactionBuilder() + try (Transaction appendTxn = + new Transaction.Builder() + .readVersion(dataset.version()) .operation( Append.builder().fragments(Collections.singletonList(fragmentMeta)).build()) - .build(); - try (Dataset dataset = appendTransaction.commit()) { - assertEquals(2, dataset.version()); - assertEquals(2, dataset.latestVersion()); - assertEquals(rowCount, dataset.countRows()); + .build()) { + try (Dataset dataset = new CommitBuilder(this.dataset).execute(appendTxn)) { + assertEquals(2, dataset.version()); + assertEquals(2, dataset.latestVersion()); + assertEquals(rowCount, dataset.countRows()); + } } dataset = Dataset.open(datasetPath, allocator); @@ -145,59 +146,60 @@ void testUpdateColumns(@TempDir Path tempDir) throws Exception { * 3: | null | null | */ FragmentUpdateResult updateResult = testDataset.updateColumn(targetFragment, updateRowCount); - Transaction updateTransaction = - dataset - .newTransactionBuilder() + try (Transaction updateTxn = + new Transaction.Builder() + .readVersion(dataset.version()) .operation( Update.builder() .updatedFragments( Collections.singletonList(updateResult.getUpdatedFragment())) .fieldsModified(updateResult.getFieldsModified()) .build()) - .build(); - try (Dataset dataset = updateTransaction.commit()) { - assertEquals(3, dataset.version()); - assertEquals(3, dataset.latestVersion()); - Fragment fragment = dataset.getFragments().get(0); - try (LanceScanner scanner = fragment.newScan(rowCount)) { - List actualIds = new ArrayList<>(rowCount); - List actualNames = new ArrayList<>(rowCount); - List actualTimeStamps = new ArrayList<>(rowCount); - try (ArrowReader reader = scanner.scanBatches()) { - while (reader.loadNextBatch()) { - VectorSchemaRoot root = reader.getVectorSchemaRoot(); - IntVector idVector = (IntVector) root.getVector("id"); - for (int i = 0; i < idVector.getValueCount(); i++) { - actualIds.add(idVector.isNull(i) ? null : idVector.getObject(i)); - } - VarCharVector nameVector = (VarCharVector) root.getVector("name"); - for (int i = 0; i < nameVector.getValueCount(); i++) { - actualNames.add(nameVector.isNull(i) ? null : nameVector.getObject(i).toString()); - } - TimeStampSecTZVector timeStampVector = - (TimeStampSecTZVector) root.getVector("timeStamp"); - for (int i = 0; i < timeStampVector.getValueCount(); i++) { - actualTimeStamps.add( - timeStampVector.isNull(i) ? null : timeStampVector.getObject(i)); + .build()) { + try (Dataset dataset = new CommitBuilder(this.dataset).execute(updateTxn)) { + assertEquals(3, dataset.version()); + assertEquals(3, dataset.latestVersion()); + Fragment fragment = dataset.getFragments().get(0); + try (LanceScanner scanner = fragment.newScan(rowCount)) { + List actualIds = new ArrayList<>(rowCount); + List actualNames = new ArrayList<>(rowCount); + List actualTimeStamps = new ArrayList<>(rowCount); + try (ArrowReader reader = scanner.scanBatches()) { + while (reader.loadNextBatch()) { + VectorSchemaRoot root = reader.getVectorSchemaRoot(); + IntVector idVector = (IntVector) root.getVector("id"); + for (int i = 0; i < idVector.getValueCount(); i++) { + actualIds.add(idVector.isNull(i) ? null : idVector.getObject(i)); + } + VarCharVector nameVector = (VarCharVector) root.getVector("name"); + for (int i = 0; i < nameVector.getValueCount(); i++) { + actualNames.add(nameVector.isNull(i) ? null : nameVector.getObject(i).toString()); + } + TimeStampSecTZVector timeStampVector = + (TimeStampSecTZVector) root.getVector("timeStamp"); + for (int i = 0; i < timeStampVector.getValueCount(); i++) { + actualTimeStamps.add( + timeStampVector.isNull(i) ? null : timeStampVector.getObject(i)); + } } } + /* result dataset content + * _rowid | id | name | timeStamp | + * 0: | 100 | "Update 0" | 0 | + * 1: | null | null | null | + * 2: | 2 | "Update 2" | 2 | + * 3: | null | null | null | + * 4: | 4 | "Person 4" | 4 | + * 5: | null | null | null | + */ + List expectIds = Arrays.asList(100, null, 2, null, 4, null); + List expectNames = + Arrays.asList("Update 0", null, "Update 2", null, "Person 4", null); + List expectTimeStamps = Arrays.asList(0L, null, 2L, null, 4L, null); + assertEquals(expectIds, actualIds); + assertEquals(expectNames, actualNames); + assertEquals(expectTimeStamps, actualTimeStamps); } - /* result dataset content - * _rowid | id | name | timeStamp | - * 0: | 100 | "Update 0" | 0 | - * 1: | null | null | null | - * 2: | 2 | "Update 2" | 2 | - * 3: | null | null | null | - * 4: | 4 | "Person 4" | 4 | - * 5: | null | null | null | - */ - List expectIds = Arrays.asList(100, null, 2, null, 4, null); - List expectNames = - Arrays.asList("Update 0", null, "Update 2", null, "Person 4", null); - List expectTimeStamps = Arrays.asList(0L, null, 2L, null, 4L, null); - assertEquals(expectIds, actualIds); - assertEquals(expectNames, actualNames); - assertEquals(expectTimeStamps, actualTimeStamps); } } }