From 18625cef89474db2baccb766e0e972d713ceaca7 Mon Sep 17 00:00:00 2001 From: majin1102 Date: Tue, 16 Sep 2025 19:14:17 +0800 Subject: [PATCH 1/2] feat: support inline transaction --- .../java/com/lancedb/lance/DatasetTest.java | 78 ++++-- protos/table.proto | 5 + rust/lance-table/src/feature_flags.rs | 19 +- rust/lance-table/src/format.rs | 2 + rust/lance-table/src/format/manifest.rs | 8 + rust/lance-table/src/format/transaction.rs | 42 +++ rust/lance-table/src/io/commit.rs | 60 ++++- .../src/io/commit/external_manifest.rs | 10 +- rust/lance-table/src/io/manifest.rs | 18 +- rust/lance/src/dataset.rs | 246 +++++++++++++++--- rust/lance/src/dataset/fragment.rs | 6 +- rust/lance/src/dataset/transaction.rs | 15 +- rust/lance/src/io/commit.rs | 24 +- 13 files changed, 447 insertions(+), 86 deletions(-) create mode 100755 rust/lance-table/src/format/transaction.rs diff --git a/java/src/test/java/com/lancedb/lance/DatasetTest.java b/java/src/test/java/com/lancedb/lance/DatasetTest.java index 5af4fca4368..304556c25ae 100644 --- a/java/src/test/java/com/lancedb/lance/DatasetTest.java +++ b/java/src/test/java/com/lancedb/lance/DatasetTest.java @@ -45,6 +45,7 @@ import java.io.IOException; import java.net.URISyntaxException; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.nio.channels.ClosedChannelException; import java.nio.file.Files; import java.nio.file.Path; @@ -356,7 +357,7 @@ void testOpenNonExist(@TempDir Path tempDir) throws IOException, URISyntaxExcept } @Test - void testOpenSerializedManifest(@TempDir Path tempDir) throws IOException, URISyntaxException { + void testOpenSerializedManifest(@TempDir Path tempDir) throws IOException { Path datasetPath = tempDir.resolve("serialized_manifest"); try (BufferAllocator allocator = new RootAllocator()) { TestUtils.SimpleTestDataset testDataset = @@ -365,24 +366,18 @@ void testOpenSerializedManifest(@TempDir Path tempDir) throws IOException, URISy try (Dataset dataset1 = testDataset.createEmptyDataset()) { assertEquals(1, dataset1.version()); Path manifestPath = datasetPath.resolve("_versions"); - Stream fileStream = Files.list(manifestPath); - assertEquals(1, fileStream.count()); - Path filePath = manifestPath.resolve("1.manifest"); - byte[] manifestBytes = Files.readAllBytes(filePath); - // Need to trim the magic number at end and message length at beginning - // https://github.com/lancedb/lance/blob/main/rust/lance-table/src/io/manifest.rs#L95-L96 - byte[] trimmedManifest = Arrays.copyOfRange(manifestBytes, 4, manifestBytes.length - 16); - ByteBuffer manifestBuffer = ByteBuffer.allocateDirect(trimmedManifest.length); - manifestBuffer.put(trimmedManifest); - manifestBuffer.flip(); - try (Dataset dataset2 = testDataset.write(1, 5)) { - assertEquals(2, dataset2.version()); - assertEquals(2, dataset2.latestVersion()); - // When reading from the serialized manifest, it shouldn't know about the second dataset - ReadOptions readOptions = - new ReadOptions.Builder().setSerializedManifest(manifestBuffer).build(); - Dataset dataset1Manifest = Dataset.open(allocator, datasetPath.toString(), readOptions); - assertEquals(1, dataset1Manifest.version()); + try (Stream fileStream = Files.list(manifestPath)) { + assertEquals(1, fileStream.count()); + ByteBuffer manifestBuffer = readManifest(manifestPath.resolve("1.manifest")); + try (Dataset dataset2 = testDataset.write(1, 5)) { + assertEquals(2, dataset2.version()); + assertEquals(2, dataset2.latestVersion()); + // When reading from the serialized manifest, it shouldn't know about the second dataset + ReadOptions readOptions = + new ReadOptions.Builder().setSerializedManifest(manifestBuffer).build(); + Dataset dataset1Manifest = Dataset.open(allocator, datasetPath.toString(), readOptions); + assertEquals(1, dataset1Manifest.version()); + } } } } @@ -1393,6 +1388,51 @@ void testCompactWithAllOptions(@TempDir Path tempDir) { } } + /** + * This method must be aligned with the implementation in ... + */ + public ByteBuffer readManifest(Path filePath) throws IOException { + byte[] fileBytes = Files.readAllBytes(filePath); + int fileSize = fileBytes.length; + + // Basic file size validation + if (fileSize < 16) { + throw new IllegalArgumentException("File too small"); + } + + // Read the last 16 bytes of the file to get metadata + // Structure: [manifest_pos (8 bytes)][magic (8 bytes)] + ByteBuffer tailBuffer = ByteBuffer.wrap(fileBytes, fileSize - 16, 16); + tailBuffer.order(ByteOrder.LITTLE_ENDIAN); + long manifestPos = tailBuffer.getLong(); // Read manifest start position + // Magic number bytes are read but not validated, we simply skip over them + tailBuffer.getLong(); // This reads and skips the 8-byte magic number + + // Remove strict validation since file_size can be larger than manifest_size + // due to index and transaction metadata at the beginning of the file + // Only ensure manifestPos is not negative and doesn't cause overflow + if (manifestPos < 0 || manifestPos >= Integer.MAX_VALUE) { + throw new IllegalArgumentException("Invalid manifest position: " + manifestPos); + } + + int manifestStart = (int) manifestPos; + + // Verify we have enough data for the length field + if (manifestStart + 4 > fileSize) { + throw new IllegalArgumentException("Manifest position beyond file bounds"); + } + + // Calculate the actual length of the protobuf data + // The structure is: [4-byte length][protobuf data][8-byte manifest_pos][8-byte magic] + byte[] trimmedManifest = + Arrays.copyOfRange(fileBytes, manifestStart + 4, fileBytes.length - 16); + ByteBuffer manifestBuffer = ByteBuffer.allocateDirect(trimmedManifest.length); + manifestBuffer.put(trimmedManifest); + manifestBuffer.flip(); + return manifestBuffer; + } + @Test void testShallowClone(@TempDir Path tempDir) { String srcPath = tempDir.resolve("shallow_clone_version_src").toString(); diff --git a/protos/table.proto b/protos/table.proto index d8f637a5d04..79c6ba25b5e 100644 --- a/protos/table.proto +++ b/protos/table.proto @@ -118,6 +118,11 @@ message Manifest { // and {uuid} is a hyphen-separated UUID. string transaction_file = 12; + // The file position of the transaction content. None if transaction is empty + // This transaction content begins with the transaction content length as u32 + // If the transaction proto message has a length of `len`, the message ends at `len` + 4 + optional uint64 transaction_section = 21; + // The next unused row id. If zero, then the table does not have any rows. // // This is only used if the "stable_row_ids" feature flag is set. diff --git a/rust/lance-table/src/feature_flags.rs b/rust/lance-table/src/feature_flags.rs index f06e50799a2..16239c59a02 100644 --- a/rust/lance-table/src/feature_flags.rs +++ b/rust/lance-table/src/feature_flags.rs @@ -20,11 +20,17 @@ pub const FLAG_USE_V2_FORMAT_DEPRECATED: u64 = 4; pub const FLAG_TABLE_CONFIG: u64 = 8; /// Dataset uses multiple base paths (for shallow clones or multi-base datasets) pub const FLAG_BASE_PATHS: u64 = 16; +/// Disable writing transaction file under _transaction/, this flag is set when we only want to write inline transaction in manifest +pub const FLAG_DISABLE_TRANSACTION_FILE: u64 = 32; /// The first bit that is unknown as a feature flag -pub const FLAG_UNKNOWN: u64 = 32; +pub const FLAG_UNKNOWN: u64 = 64; /// Set the reader and writer feature flags in the manifest based on the contents of the manifest. -pub fn apply_feature_flags(manifest: &mut Manifest, enable_stable_row_id: bool) -> Result<()> { +pub fn apply_feature_flags( + manifest: &mut Manifest, + enable_stable_row_id: bool, + disable_transaction_file: bool, +) -> Result<()> { // Reset flags manifest.reader_feature_flags = 0; manifest.writer_feature_flags = 0; @@ -70,6 +76,9 @@ pub fn apply_feature_flags(manifest: &mut Manifest, enable_stable_row_id: bool) manifest.writer_feature_flags |= FLAG_BASE_PATHS; } + if disable_transaction_file { + manifest.writer_feature_flags |= FLAG_DISABLE_TRANSACTION_FILE; + } Ok(()) } @@ -98,6 +107,7 @@ mod tests { assert!(can_read_dataset(super::FLAG_USE_V2_FORMAT_DEPRECATED)); assert!(can_read_dataset(super::FLAG_TABLE_CONFIG)); assert!(can_read_dataset(super::FLAG_BASE_PATHS)); + assert!(can_read_dataset(super::FLAG_DISABLE_TRANSACTION_FILE)); assert!(can_read_dataset( super::FLAG_DELETION_FILES | super::FLAG_STABLE_ROW_IDS @@ -114,6 +124,7 @@ mod tests { assert!(can_write_dataset(super::FLAG_USE_V2_FORMAT_DEPRECATED)); assert!(can_write_dataset(super::FLAG_TABLE_CONFIG)); assert!(can_write_dataset(super::FLAG_BASE_PATHS)); + assert!(can_write_dataset(super::FLAG_DISABLE_TRANSACTION_FILE)); assert!(can_write_dataset( super::FLAG_DELETION_FILES | super::FLAG_STABLE_ROW_IDS @@ -146,7 +157,7 @@ mod tests { None, HashMap::new(), // Empty base_paths ); - apply_feature_flags(&mut normal_manifest, false).unwrap(); + apply_feature_flags(&mut normal_manifest, false, false).unwrap(); assert_eq!(normal_manifest.reader_feature_flags & FLAG_BASE_PATHS, 0); assert_eq!(normal_manifest.writer_feature_flags & FLAG_BASE_PATHS, 0); // Test 2: Dataset with base_paths (shallow clone or multi-base) should have FLAG_BASE_PATHS @@ -167,7 +178,7 @@ mod tests { None, base_paths, ); - apply_feature_flags(&mut multi_base_manifest, false).unwrap(); + apply_feature_flags(&mut multi_base_manifest, false, false).unwrap(); assert_ne!( multi_base_manifest.reader_feature_flags & FLAG_BASE_PATHS, 0 diff --git a/rust/lance-table/src/format.rs b/rust/lance-table/src/format.rs index 58ed05f37ff..2e504a5aa28 100644 --- a/rust/lance-table/src/format.rs +++ b/rust/lance-table/src/format.rs @@ -8,6 +8,7 @@ use uuid::Uuid; mod fragment; mod index; mod manifest; +mod transaction; pub use crate::rowids::version::{ RowDatasetVersionMeta, RowDatasetVersionRun, RowDatasetVersionSequence, @@ -19,6 +20,7 @@ pub use manifest::{ is_detached_version, BasePath, DataStorageFormat, Manifest, SelfDescribingFileReader, WriterVersion, DETACHED_VERSION_MASK, }; +pub use transaction::Transaction; use lance_core::{Error, Result}; diff --git a/rust/lance-table/src/format/manifest.rs b/rust/lance-table/src/format/manifest.rs index 11512065c7c..9dd6134318f 100644 --- a/rust/lance-table/src/format/manifest.rs +++ b/rust/lance-table/src/format/manifest.rs @@ -79,6 +79,9 @@ pub struct Manifest { /// The path to the transaction file, relative to the root of the dataset pub transaction_file: Option, + /// The file position of the inline transaction content inside the manifest + pub transaction_section: Option, + /// Precomputed logic offset of each fragment /// accelerating the fragment search using offset ranges. fragment_offsets: Vec, @@ -195,6 +198,7 @@ impl Manifest { writer_feature_flags: 0, max_fragment_id: None, transaction_file: None, + transaction_section: None, fragment_offsets, next_row_id: 0, data_storage_format, @@ -231,6 +235,7 @@ impl Manifest { writer_feature_flags: 0, // These will be set on commit max_fragment_id: previous.max_fragment_id, transaction_file: None, + transaction_section: None, fragment_offsets, next_row_id: previous.next_row_id, data_storage_format: previous.data_storage_format.clone(), @@ -289,6 +294,7 @@ impl Manifest { writer_feature_flags: 0, // These will be set on commit max_fragment_id: self.max_fragment_id, transaction_file: Some(transaction_file), + transaction_section: None, fragment_offsets: self.fragment_offsets.clone(), next_row_id: self.next_row_id, data_storage_format: self.data_storage_format.clone(), @@ -874,6 +880,7 @@ impl TryFrom for Manifest { } else { Some(p.transaction_file) }, + transaction_section: p.transaction_section.map(|i| i as usize), fragment_offsets, next_row_id: p.next_row_id, data_storage_format, @@ -950,6 +957,7 @@ impl From<&Manifest> for pb::Manifest { path: base_path.path.clone(), }) .collect(), + transaction_section: m.transaction_section.map(|i| i as u64), } } } diff --git a/rust/lance-table/src/format/transaction.rs b/rust/lance-table/src/format/transaction.rs new file mode 100755 index 00000000000..09157014f7d --- /dev/null +++ b/rust/lance-table/src/format/transaction.rs @@ -0,0 +1,42 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! Transaction struct for lance-table format layer. +//! +//! This struct is introduced to provide a Struct-first API for passing transaction +//! information within the lance-table crate. It mirrors the protobuf Transaction +//! message at a semantic level while remaining crate-local, so lance-table does +//! not depend on higher layers (e.g., lance crate). +//! +//! Conversion to protobuf occurs at the write boundary. See the From +//! implementation below. + +use crate::format::pb; + +#[derive(Clone, Debug, PartialEq)] +pub struct Transaction { + /// Crate-local representation backing: protobuf Transaction. + /// Keeping this simple avoids ring dependencies while still enabling + /// Struct-first parameter passing in lance-table. + pub inner: pb::Transaction, +} + +impl Transaction { + /// Accessor for testing or internal inspection if needed. + pub fn as_pb(&self) -> &pb::Transaction { + &self.inner + } +} + +/// Write-boundary conversion: serialize using protobuf at the last step. +impl From for pb::Transaction { + fn from(tx: Transaction) -> Self { + tx.inner + } +} + +impl From for Transaction { + fn from(pb_tx: pb::Transaction) -> Self { + Self { inner: pb_tx } + } +} diff --git a/rust/lance-table/src/io/commit.rs b/rust/lance-table/src/io/commit.rs index 156247bd32e..6dbbb5879a2 100644 --- a/rust/lance-table/src/io/commit.rs +++ b/rust/lance-table/src/io/commit.rs @@ -28,6 +28,7 @@ use std::sync::atomic::AtomicBool; use std::sync::Arc; use std::{fmt::Debug, fs::DirEntry}; +use super::manifest::write_manifest; use futures::future::Either; use futures::Stream; use futures::{ @@ -35,11 +36,13 @@ use futures::{ stream::BoxStream, StreamExt, TryStreamExt, }; -use lance_io::object_writer::WriteResult; +use lance_file::format::{MAGIC, MAJOR_VERSION, MINOR_VERSION}; +use lance_io::object_writer::{ObjectWriter, WriteResult}; use log::warn; use object_store::PutOptions; use object_store::{path::Path, Error as ObjectStoreError, ObjectStore as OSObjectStore}; use snafu::location; +use tracing::info; use url::Url; #[cfg(feature = "dynamodb")] @@ -48,7 +51,10 @@ pub mod external_manifest; use lance_core::{Error, Result}; use lance_io::object_store::{ObjectStore, ObjectStoreExt, ObjectStoreParams}; +use lance_io::traits::WriteExt; +use crate::format::{is_detached_version, IndexMetadata, Manifest, Transaction}; +use lance_core::utils::tracing::{AUDIT_MODE_CREATE, AUDIT_TYPE_MANIFEST, TRACE_FILE_AUDIT}; #[cfg(feature = "dynamodb")] use { self::external_manifest::{ExternalManifestCommitHandler, ExternalManifestStore}, @@ -61,8 +67,6 @@ use { std::time::{Duration, SystemTime}, }; -use crate::format::{is_detached_version, IndexMetadata, Manifest}; - const VERSIONS_DIR: &str = "_versions"; const MANIFEST_EXTENSION: &str = "manifest"; const DETACHED_VERSION_PREFIX: &str = "d"; @@ -181,8 +185,31 @@ pub type ManifestWriter = for<'a> fn( manifest: &'a mut Manifest, indices: Option>, path: &'a Path, + transaction: Option, ) -> BoxFuture<'a, Result>; +/// Canonical manifest writer; its function item type exactly matches `ManifestWriter`. +/// Rationale: keep a crate-local writer implementation so call sites can pass this function +/// directly without non-primitive casts or lifetime coercions. +pub fn write_manifest_file_to_path<'a>( + object_store: &'a ObjectStore, + manifest: &'a mut Manifest, + indices: Option>, + path: &'a Path, + transaction: Option, +) -> BoxFuture<'a, Result> { + Box::pin(async move { + let mut object_writer = ObjectWriter::new(object_store, path).await?; + let pos = write_manifest(&mut object_writer, manifest, indices, transaction).await?; + object_writer + .write_magics(pos, MAJOR_VERSION, MINOR_VERSION, MAGIC) + .await?; + let res = object_writer.shutdown().await?; + info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_CREATE, r#type=AUDIT_TYPE_MANIFEST, path = path.to_string()); + Ok(res) + }) +} + #[derive(Debug, Clone)] pub struct ManifestLocation { /// The version the manifest corresponds to. @@ -461,6 +488,7 @@ const DDB_URL_QUERY_KEY: &str = "ddbTableName"; /// // TODO: pub(crate) #[async_trait::async_trait] +#[allow(clippy::too_many_arguments)] pub trait CommitHandler: Debug + Send + Sync { async fn resolve_latest_location( &self, @@ -552,6 +580,7 @@ pub trait CommitHandler: Debug + Send + Sync { object_store: &ObjectStore, manifest_writer: ManifestWriter, naming_scheme: ManifestNamingScheme, + transaction: Option, ) -> std::result::Result; /// Delete the recorded manifest information for a dataset at the base_path @@ -803,6 +832,7 @@ static WARNED_ON_UNSAFE_COMMIT: AtomicBool = AtomicBool::new(false); pub struct UnsafeCommitHandler; #[async_trait::async_trait] +#[allow(clippy::too_many_arguments)] impl CommitHandler for UnsafeCommitHandler { async fn commit( &self, @@ -812,6 +842,7 @@ impl CommitHandler for UnsafeCommitHandler { object_store: &ObjectStore, manifest_writer: ManifestWriter, naming_scheme: ManifestNamingScheme, + transaction: Option, ) -> std::result::Result { // Log a one-time warning if !WARNED_ON_UNSAFE_COMMIT.load(std::sync::atomic::Ordering::Relaxed) { @@ -823,8 +854,8 @@ impl CommitHandler for UnsafeCommitHandler { } let version_path = naming_scheme.manifest_path(base_path, manifest.version); - // Write the manifest naively - let res = manifest_writer(object_store, manifest, indices, &version_path).await?; + let res = + manifest_writer(object_store, manifest, indices, &version_path, transaction).await?; Ok(ManifestLocation { version: manifest.version, @@ -878,6 +909,7 @@ impl CommitHandler for T { object_store: &ObjectStore, manifest_writer: ManifestWriter, naming_scheme: ManifestNamingScheme, + transaction: Option, ) -> std::result::Result { let path = naming_scheme.manifest_path(base_path, manifest.version); // NOTE: once we have the lease we cannot use ? to return errors, since @@ -902,7 +934,7 @@ impl CommitHandler for T { return Err(CommitError::OtherError(e.into())); } } - let res = manifest_writer(object_store, manifest, indices, &path).await; + let res = manifest_writer(object_store, manifest, indices, &path, transaction).await; // Release the lock lease.release(res.is_ok()).await?; @@ -928,6 +960,7 @@ impl CommitHandler for Arc { object_store: &ObjectStore, manifest_writer: ManifestWriter, naming_scheme: ManifestNamingScheme, + transaction: Option, ) -> std::result::Result { self.as_ref() .commit( @@ -937,6 +970,7 @@ impl CommitHandler for Arc { object_store, manifest_writer, naming_scheme, + transaction, ) .await } @@ -957,6 +991,7 @@ impl CommitHandler for RenameCommitHandler { object_store: &ObjectStore, manifest_writer: ManifestWriter, naming_scheme: ManifestNamingScheme, + transaction: Option, ) -> std::result::Result { // Create a temporary object, then use `rename_if_not_exists` to commit. // If failed, clean up the temporary object. @@ -964,8 +999,7 @@ impl CommitHandler for RenameCommitHandler { let path = naming_scheme.manifest_path(base_path, manifest.version); let tmp_path = make_staging_manifest_path(&path)?; - // Write the manifest to the temporary path - let res = manifest_writer(object_store, manifest, indices, &tmp_path).await?; + let res = manifest_writer(object_store, manifest, indices, &tmp_path, transaction).await?; match object_store .inner @@ -1015,12 +1049,20 @@ impl CommitHandler for ConditionalPutCommitHandler { object_store: &ObjectStore, manifest_writer: ManifestWriter, naming_scheme: ManifestNamingScheme, + transaction: Option, ) -> std::result::Result { let path = naming_scheme.manifest_path(base_path, manifest.version); let memory_store = ObjectStore::memory(); let dummy_path = "dummy"; - manifest_writer(&memory_store, manifest, indices, &dummy_path.into()).await?; + manifest_writer( + &memory_store, + manifest, + indices, + &dummy_path.into(), + transaction, + ) + .await?; let dummy_data = memory_store.read_one_all(&dummy_path.into()).await?; let size = dummy_data.len() as u64; let res = object_store diff --git a/rust/lance-table/src/io/commit/external_manifest.rs b/rust/lance-table/src/io/commit/external_manifest.rs index ed05a90896e..c46fc7f67c3 100644 --- a/rust/lance-table/src/io/commit/external_manifest.rs +++ b/rust/lance-table/src/io/commit/external_manifest.rs @@ -23,8 +23,8 @@ use super::{ current_manifest_path, default_resolve_version, make_staging_manifest_path, ManifestLocation, ManifestNamingScheme, MANIFEST_EXTENSION, }; -use crate::format::{IndexMetadata, Manifest}; -use crate::io::commit::{CommitError, CommitHandler, ManifestWriter}; +use crate::format::{IndexMetadata, Manifest, Transaction}; +use crate::io::commit::{CommitError, CommitHandler}; /// External manifest store /// @@ -376,8 +376,9 @@ impl CommitHandler for ExternalManifestCommitHandler { indices: Option>, base_path: &Path, object_store: &ObjectStore, - manifest_writer: ManifestWriter, + manifest_writer: super::ManifestWriter, naming_scheme: ManifestNamingScheme, + transaction: Option, ) -> std::result::Result { // path we get here is the path to the manifest we want to write // use object_store.base_path.as_ref() for getting the root of the dataset @@ -385,7 +386,8 @@ impl CommitHandler for ExternalManifestCommitHandler { // step 1: Write the manifest we want to commit to object store with a temporary name let path = naming_scheme.manifest_path(base_path, manifest.version); let staging_path = make_staging_manifest_path(&path)?; - let write_res = manifest_writer(object_store, manifest, indices, &staging_path).await?; + let write_res = + manifest_writer(object_store, manifest, indices, &staging_path, transaction).await?; // step 2 & 3: Try to commit this version to external store, return err on failure let res = self diff --git a/rust/lance-table/src/io/manifest.rs b/rust/lance-table/src/io/manifest.rs index 6ef313a4230..55a163a7578 100644 --- a/rust/lance-table/src/io/manifest.rs +++ b/rust/lance-table/src/io/manifest.rs @@ -22,7 +22,7 @@ use lance_io::{ utils::read_message, }; -use crate::format::{pb, DataStorageFormat, IndexMetadata, Manifest, MAGIC}; +use crate::format::{pb, DataStorageFormat, IndexMetadata, Manifest, Transaction, MAGIC}; use super::commit::ManifestLocation; @@ -141,6 +141,7 @@ async fn do_write_manifest( writer: &mut dyn Writer, manifest: &mut Manifest, indices: Option>, + mut transaction: Option, ) -> Result { // Write indices if presented. if let Some(indices) = indices.as_ref() { @@ -151,6 +152,14 @@ async fn do_write_manifest( manifest.index_section = Some(pos); } + // Write inline transaction if presented. + if let Some(tx) = transaction.take() { + // Convert to protobuf at the write boundary to persist inline + let pb_tx: pb::Transaction = tx.into(); + let pos = writer.write_protobuf(&pb_tx).await?; + manifest.transaction_section = Some(pos); + } + writer.write_struct(manifest).await } @@ -159,6 +168,7 @@ pub async fn write_manifest( writer: &mut dyn Writer, manifest: &mut Manifest, indices: Option>, + transaction: Option, ) -> Result { // Write dictionary values. let max_field_id = manifest.schema.max_field_id().unwrap_or(-1); @@ -209,7 +219,7 @@ pub async fn write_manifest( } } - do_write_manifest(writer, manifest, indices).await + do_write_manifest(writer, manifest, indices, transaction).await } /// Implementation of ManifestProvider that describes a Lance file by writing @@ -229,7 +239,7 @@ impl ManifestProvider for ManifestDescribing { /*blob_dataset_version= */ None, HashMap::new(), ); - let pos = do_write_manifest(object_writer, &mut manifest, None).await?; + let pos = do_write_manifest(object_writer, &mut manifest, None, None).await?; Ok(Some(pos)) } } @@ -281,7 +291,7 @@ mod test { /*blob_dataset_version= */ None, HashMap::new(), ); - let pos = write_manifest(&mut writer, &mut manifest, None) + let pos = write_manifest(&mut writer, &mut manifest, None, None) .await .unwrap(); writer diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index bf890eebebc..01a55f9ebfa 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -22,8 +22,8 @@ use lance_core::datatypes::{Field, OnMissing, OnTypeMismatch, Projectable, Proje use lance_core::traits::DatasetTakeRows; use lance_core::utils::address::RowAddress; use lance_core::utils::tracing::{ - AUDIT_MODE_CREATE, AUDIT_TYPE_MANIFEST, DATASET_CLEANING_EVENT, DATASET_DELETING_EVENT, - DATASET_DROPPING_COLUMN_EVENT, TRACE_DATASET_EVENTS, TRACE_FILE_AUDIT, + DATASET_CLEANING_EVENT, DATASET_DELETING_EVENT, DATASET_DROPPING_COLUMN_EVENT, + TRACE_DATASET_EVENTS, }; use lance_core::{ROW_ADDR, ROW_ADDR_FIELD, ROW_ID_FIELD}; use lance_datafusion::projection::ProjectionPlan; @@ -32,18 +32,15 @@ use lance_file::v2::reader::FileReaderOptions; use lance_file::version::LanceFileVersion; use lance_index::DatasetIndexExt; use lance_io::object_store::{ObjectStore, ObjectStoreParams}; -use lance_io::object_writer::{ObjectWriter, WriteResult}; -use lance_io::traits::WriteExt; -use lance_io::utils::{read_last_block, read_metadata_offset, read_struct}; +use lance_io::utils::{read_last_block, read_message, read_metadata_offset, read_struct}; use lance_table::format::{ - DataFile, DataStorageFormat, DeletionFile, Fragment, IndexMetadata, Manifest, MAGIC, - MAJOR_VERSION, MINOR_VERSION, + pb, DataFile, DataStorageFormat, DeletionFile, Fragment, IndexMetadata, Manifest, }; use lance_table::io::commit::{ - migrate_scheme_to_v2, CommitConfig, CommitError, CommitHandler, CommitLock, ManifestLocation, - ManifestNamingScheme, + migrate_scheme_to_v2, write_manifest_file_to_path, CommitConfig, CommitError, CommitHandler, + CommitLock, ManifestLocation, ManifestNamingScheme, }; -use lance_table::io::manifest::{read_manifest, write_manifest}; +use lance_table::io::manifest::read_manifest; use object_store::path::Path; use prost::Message; use roaring::RoaringBitmap; @@ -678,7 +675,7 @@ impl Dataset { }); } - // If indices were also the last block, we can take the opportunity to + // If indices were also in the last block, we can take the opportunity to // decode them now and cache them. if let Some(index_offset) = manifest.index_section { if manifest_size - index_offset <= last_block.len() { @@ -705,6 +702,29 @@ impl Dataset { } } + // If transaction is also in the last block, we can take the opportunity to + // decode them now and cache them. + if let Some(transaction_offset) = manifest.transaction_section { + if manifest_size - transaction_offset <= last_block.len() { + let offset_in_block = last_block.len() - (manifest_size - transaction_offset); + let message_len = + LittleEndian::read_u32(&last_block[offset_in_block..offset_in_block + 4]) + as usize; + let message_data = + &last_block[offset_in_block + 4..offset_in_block + 4 + message_len]; + let transaction: Transaction = + lance_table::format::pb::Transaction::decode(message_data)?.try_into()?; + + let metadata_cache = session.metadata_cache.for_dataset(uri); + let metadata_key = TransactionKey { + version: manifest_location.version, + }; + metadata_cache + .insert_with_key(&metadata_key, Arc::new(transaction)) + .await; + } + } + if manifest.should_use_legacy_format() { populate_schema_dictionary(&mut manifest.schema, object_reader.as_ref()).await?; } @@ -914,7 +934,11 @@ impl Dataset { }; populate_schema_dictionary(&mut manifest.schema, reader.as_ref()).await?; } - Ok((Arc::new(manifest), location)) + let manifest_arc = Arc::new(manifest); + self.metadata_cache + .insert_with_key(&manifest_key, manifest_arc.clone()) + .await; + Ok((manifest_arc, location)) } /// Read the transaction file for this version of the dataset. @@ -922,13 +946,36 @@ impl Dataset { /// If there was no transaction file written for this version of the dataset /// then this will return None. pub async fn read_transaction(&self) -> Result> { - let path = match &self.manifest.transaction_file { - Some(path) => self.base.child("_transactions").child(path.as_str()), - None => return Ok(None), + let transaction_key = TransactionKey { + version: self.manifest.version, }; - let data = self.object_store.inner.get(&path).await?.bytes().await?; - let transaction = lance_table::format::pb::Transaction::decode(data)?; - Transaction::try_from(transaction).map(Some) + if let Some(transaction) = self.metadata_cache.get_with_key(&transaction_key).await { + return Ok(Some((*transaction).clone())); + } + // Prefer inline transaction from manifest when available + if let Some(pos) = self.manifest.transaction_section { + let reader = if let Some(size) = self.manifest_location.size { + self.object_store + .open_with_size(&self.manifest_location.path, size as usize) + .await? + } else { + self.object_store.open(&self.manifest_location.path).await? + }; + + let tx: pb::Transaction = read_message(reader.as_ref(), pos).await?; + return Transaction::try_from(tx).map(Some); + // If any of the checks above fail, we will fall through to external file + } + + // Fallback: read external transaction file if present + if let Some(path) = &self.manifest.transaction_file { + let path = self.base.child("_transactions").child(path.as_str()); + let data = self.object_store.inner.get(&path).await?.bytes().await?; + let transaction = lance_table::format::pb::Transaction::decode(data)?; + return Transaction::try_from(transaction).map(Some); + } + + Ok(None) } /// Read the transaction file for this version of the dataset. @@ -2532,6 +2579,7 @@ pub(crate) struct ManifestWriteConfig { use_stable_row_ids: bool, // default false use_legacy_format: Option, // default None storage_format: Option, // default None + disable_transaction_file: bool, // default false } impl Default for ManifestWriteConfig { @@ -2540,13 +2588,21 @@ impl Default for ManifestWriteConfig { auto_set_feature_flags: true, timestamp: None, use_stable_row_ids: false, + disable_transaction_file: false, use_legacy_format: None, storage_format: None, } } } +impl ManifestWriteConfig { + pub fn disable_transaction_file(&self) -> bool { + self.disable_transaction_file + } +} + /// Commit a manifest file and create a copy at the latest manifest path. +#[allow(clippy::too_many_arguments)] pub(crate) async fn write_manifest_file( object_store: &ObjectStore, commit_handler: &dyn CommitHandler, @@ -2555,9 +2611,14 @@ pub(crate) async fn write_manifest_file( indices: Option>, config: &ManifestWriteConfig, naming_scheme: ManifestNamingScheme, + mut transaction: Option<&Transaction>, ) -> std::result::Result { if config.auto_set_feature_flags { - apply_feature_flags(manifest, config.use_stable_row_ids)?; + apply_feature_flags( + manifest, + config.use_stable_row_ids, + config.disable_transaction_file, + )?; } manifest.set_timestamp(timestamp_to_nanos(config.timestamp)); @@ -2572,28 +2633,11 @@ pub(crate) async fn write_manifest_file( object_store, write_manifest_file_to_path, naming_scheme, + transaction.take().map(|tx| tx.into()), ) .await } -fn write_manifest_file_to_path<'a>( - object_store: &'a ObjectStore, - manifest: &'a mut Manifest, - indices: Option>, - path: &'a Path, -) -> BoxFuture<'a, Result> { - Box::pin(async { - let mut object_writer = ObjectWriter::new(object_store, path).await?; - let pos = write_manifest(&mut object_writer, manifest, indices).await?; - object_writer - .write_magics(pos, MAJOR_VERSION, MINOR_VERSION, MAGIC) - .await?; - let res = object_writer.shutdown().await?; - info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_CREATE, r#type=AUDIT_TYPE_MANIFEST, path = path.to_string()); - Ok(res) - }) -} - impl Projectable for Dataset { fn schema(&self) -> &Schema { self.schema() @@ -3104,8 +3148,10 @@ mod tests { use_stable_row_ids: false, use_legacy_format: None, storage_format: None, + disable_transaction_file: false, }, dataset.manifest_location.naming_scheme, + None, ) .await .unwrap(); @@ -8684,6 +8730,132 @@ mod tests { ) } + // Test coverage: + // Case 1: delete external transaction file → read_transaction should prioritize inline and succeed. + // Case 2: reading small manifest caches transaction data, eliminating transaction reading IO. + // Case 3: manifest does not contain inline → read_transaction should fall back to external transaction file and succeed. + #[tokio::test] + async fn test_inline_transaction() { + use arrow_array::{Int32Array, RecordBatch, RecordBatchIterator}; + use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema}; + use std::sync::Arc; + + async fn create_dataset(rows: i32) -> Arc { + let dir = TempDir::default(); + let uri = dir.path_str(); + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "i", + DataType::Int32, + false, + )])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from_iter_values(0..rows))], + ) + .unwrap(); + let ds = Dataset::write( + RecordBatchIterator::new(vec![Ok(batch)], schema), + uri.as_str(), + None, + ) + .await + .unwrap(); + Arc::new(ds) + } + + fn make_tx(read_version: u64) -> Transaction { + Transaction::new( + read_version, + Operation::Append { fragments: vec![] }, + None, + None, + ) + } + + async fn delete_external_tx_file(ds: &Dataset) { + if let Some(tx_file) = ds.manifest.transaction_file.as_ref() { + let tx_path = ds.base.child("_transactions").child(tx_file.as_str()); + let _ = ds.object_store.inner.delete(&tx_path).await; // ignore errors + } + } + + let session = Arc::new(Session::default()); + let io_tracker = Arc::new(IOTracker::default()); + + // Case 1: Default write_flag=true, delete external transaction file, read should use inline transaction + let ds = create_dataset(5).await; + let read_version = ds.manifest().version; + let tx = make_tx(read_version); + let ds2 = CommitBuilder::new(ds.clone()) + .execute(tx.clone()) + .await + .unwrap(); + delete_external_tx_file(&ds2).await; + let read_tx = ds2.read_transaction().await.unwrap().unwrap(); + assert_eq!(read_tx, tx.clone()); + + // Case 2: reading small manifest caches transaction data, eliminating transaction reading IO. + let read_ds2 = DatasetBuilder::from_uri(ds2.uri.clone()) + .with_session(session.clone()) + .with_read_params(ReadParams { + store_options: Some(ObjectStoreParams { + object_store_wrapper: Some(io_tracker.clone()), + ..Default::default() + }), + session: Some(session.clone()), + ..Default::default() + }) + .load() + .await + .unwrap(); + let stats = io_tracker.incremental_stats(); // Reset + assert!(stats.read_bytes < 64 * 1024); + // Because the manifest is so small, we should have opportunistically + // cached the transaction in memory already. + let inline_tx = read_ds2.read_transaction().await.unwrap().unwrap(); + let stats = io_tracker.incremental_stats(); + assert_eq!(stats.read_iops, 0); + assert_eq!(stats.read_bytes, 0); + assert_eq!(inline_tx, tx); + + // Case 3: manifest does not contain inline transaction, read should fall back to external transaction file + let ds = create_dataset(2).await; + let tx = make_tx(ds.manifest().version); + let tx_file = crate::io::commit::write_transaction_file(ds.object_store(), &ds.base, &tx) + .await + .unwrap(); + let (mut manifest, indices) = tx + .build_manifest( + Some(ds.manifest.as_ref()), + ds.load_indices().await.unwrap().as_ref().clone(), + &tx_file, + &ManifestWriteConfig::default(), + None, + ) + .unwrap(); + let location = write_manifest_file( + ds.object_store(), + ds.commit_handler.as_ref(), + &ds.base, + &mut manifest, + if indices.is_empty() { + None + } else { + Some(indices.clone()) + }, + &ManifestWriteConfig::default(), + ds.manifest_location.naming_scheme, + None, + ) + .await + .unwrap(); + let ds_new = ds.checkout_version(location.version).await.unwrap(); + assert!(ds_new.manifest.transaction_section.is_none()); + assert!(ds_new.manifest.transaction_file.is_some()); + let read_tx = ds_new.read_transaction().await.unwrap().unwrap(); + assert_eq!(read_tx, tx); + } + #[tokio::test] async fn test_limit_pushdown_in_physical_plan() -> Result<()> { use tempfile::tempdir; diff --git a/rust/lance/src/dataset/fragment.rs b/rust/lance/src/dataset/fragment.rs index 1544bc3583a..5566a806dd7 100644 --- a/rust/lance/src/dataset/fragment.rs +++ b/rust/lance/src/dataset/fragment.rs @@ -3905,17 +3905,17 @@ mod tests { .unwrap(); let fragment = dataset.get_fragments().pop().unwrap(); - // Assert file is small (< 4kb) + // Assert file is small (< 4300 bytes) { let stats = io_stats.incremental_stats(); assert_io_eq!(stats, write_iops, 3); - assert_io_lt!(stats, write_bytes, 4096); + assert_io_lt!(stats, write_bytes, 4300); } // Measure IOPS needed to scan all data first time. let projection = Schema::try_from(schema.as_ref()) .unwrap() - .project_by_ids(&[0, 1, 2, 3, 4, 6, 7], true); + .project_by_ids(&[0, 1, 2, 3, 4, 6, 7, 8, 9], true); let reader = fragment .open(&projection, Default::default()) .await diff --git a/rust/lance/src/dataset/transaction.rs b/rust/lance/src/dataset/transaction.rs index f22c7426b19..956062c81d2 100644 --- a/rust/lance/src/dataset/transaction.rs +++ b/rust/lance/src/dataset/transaction.rs @@ -334,6 +334,15 @@ impl std::fmt::Display for Operation { } } +impl From<&Transaction> for lance_table::format::Transaction { + fn from(value: &Transaction) -> Self { + let pb_transaction: pb::Transaction = value.into(); + Self { + inner: pb_transaction, + } + } +} + impl PartialEq for Operation { fn eq(&self, other: &Self) -> bool { // Many of the operations contain `Vec` where the order of the @@ -2223,7 +2232,11 @@ impl Transaction { manifest.tag.clone_from(&self.tag); if config.auto_set_feature_flags { - apply_feature_flags(&mut manifest, config.use_stable_row_ids)?; + apply_feature_flags( + &mut manifest, + config.use_stable_row_ids, + config.disable_transaction_file, + )?; } manifest.set_timestamp(timestamp_to_nanos(config.timestamp)); diff --git a/rust/lance/src/io/commit.rs b/rust/lance/src/io/commit.rs index 37a0fead258..970c6615c9c 100644 --- a/rust/lance/src/io/commit.rs +++ b/rust/lance/src/io/commit.rs @@ -84,7 +84,7 @@ pub(crate) async fn read_transaction_file( } /// Write a transaction to a file and return the relative path. -async fn write_transaction_file( +pub(crate) async fn write_transaction_file( object_store: &ObjectStore, base_path: &Path, transaction: &Transaction, @@ -111,7 +111,11 @@ async fn do_commit_new_dataset( metadata_cache: &DSMetadataCache, store_registry: Arc, ) -> Result<(Manifest, ManifestLocation)> { - let transaction_file = write_transaction_file(object_store, base_path, transaction).await?; + let transaction_file = if !write_config.disable_transaction_file() { + write_transaction_file(object_store, base_path, transaction).await? + } else { + String::new() + }; let (mut manifest, indices) = if let Operation::Clone { ref_name, @@ -190,6 +194,7 @@ async fn do_commit_new_dataset( }, write_config, manifest_naming_scheme, + Some(transaction), ) .await; @@ -641,7 +646,11 @@ pub(crate) async fn do_commit_detached_transaction( ) -> Result<(Manifest, ManifestLocation)> { // We don't strictly need a transaction file but we go ahead and create one for // record-keeping if nothing else. - let transaction_file = write_transaction_file(object_store, &dataset.base, transaction).await?; + let transaction_file = if !write_config.disable_transaction_file() { + write_transaction_file(object_store, &dataset.base, transaction).await? + } else { + String::new() + }; // We still do a loop since we may have conflicts in the random version we pick let mut backoff = Backoff::default(); @@ -693,6 +702,7 @@ pub(crate) async fn do_commit_detached_transaction( }, write_config, ManifestNamingScheme::V2, + Some(transaction), ) .await; @@ -866,8 +876,11 @@ pub(crate) async fn commit_transaction( transaction = rebase.finish(&dataset).await?; } - let transaction_file = - write_transaction_file(object_store, &dataset.base, &transaction).await?; + let transaction_file = if !write_config.disable_transaction_file() { + write_transaction_file(object_store, &dataset.base, &transaction).await? + } else { + String::new() + }; target_version = dataset.manifest.version + 1; if is_detached_version(target_version) { @@ -925,6 +938,7 @@ pub(crate) async fn commit_transaction( }, write_config, manifest_naming_scheme, + Some(&transaction), ) .await; From c419c4eaff2e222b9aa72522d1b0643ca5bbf313 Mon Sep 17 00:00:00 2001 From: majin1102 Date: Fri, 31 Oct 2025 19:49:33 +0800 Subject: [PATCH 2/2] load_new_transactions() adapt inline transaction --- rust/lance/src/dataset.rs | 49 +++++++++++++++++++------------------ rust/lance/src/io/commit.rs | 1 + 2 files changed, 26 insertions(+), 24 deletions(-) diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 01a55f9ebfa..3023ffdae9c 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -95,7 +95,7 @@ use crate::datatypes::Schema; use crate::index::retain_supported_indices; use crate::io::commit::{ commit_detached_transaction, commit_new_dataset, commit_transaction, - detect_overlapping_fragments, read_transaction_file, + detect_overlapping_fragments, }; use crate::session::Session; use crate::utils::temporal::{timestamp_to_nanos, utc_now, SystemTime}; @@ -952,8 +952,9 @@ impl Dataset { if let Some(transaction) = self.metadata_cache.get_with_key(&transaction_key).await { return Ok(Some((*transaction).clone())); } + // Prefer inline transaction from manifest when available - if let Some(pos) = self.manifest.transaction_section { + let transaction = if let Some(pos) = self.manifest.transaction_section { let reader = if let Some(size) = self.manifest_location.size { self.object_store .open_with_size(&self.manifest_location.path, size as usize) @@ -963,19 +964,23 @@ impl Dataset { }; let tx: pb::Transaction = read_message(reader.as_ref(), pos).await?; - return Transaction::try_from(tx).map(Some); - // If any of the checks above fail, we will fall through to external file - } - - // Fallback: read external transaction file if present - if let Some(path) = &self.manifest.transaction_file { + Transaction::try_from(tx).map(Some)? + } else if let Some(path) = &self.manifest.transaction_file { + // Fallback: read external transaction file if present let path = self.base.child("_transactions").child(path.as_str()); let data = self.object_store.inner.get(&path).await?.bytes().await?; let transaction = lance_table::format::pb::Transaction::decode(data)?; - return Transaction::try_from(transaction).map(Some); - } + Transaction::try_from(transaction).map(Some)? + } else { + None + }; - Ok(None) + if let Some(tx) = transaction.as_ref() { + self.metadata_cache + .insert_with_key(&transaction_key, Arc::new(tx.clone())) + .await; + } + Ok(transaction) } /// Read the transaction file for this version of the dataset. @@ -2143,20 +2148,16 @@ pub(crate) fn load_new_transactions(dataset: &Dataset) -> NewTransactionResult<' dataset.file_reader_options.clone(), dataset.store_params.as_deref().cloned(), )?; - let object_store = dataset_version.object_store(); - let path = dataset_version - .manifest - .transaction_file - .as_ref() - .ok_or_else(|| Error::Internal { - message: format!( - "Dataset version {} does not have a transaction file", - manifest_copy.version - ), - location: location!(), - })?; let loaded = - Arc::new(read_transaction_file(object_store, &dataset.base, path).await?); + Arc::new(dataset_version.read_transaction().await?.ok_or_else(|| { + Error::Internal { + message: format!( + "Dataset version {} does not have a transaction file", + manifest_copy.version + ), + location: location!(), + } + })?); dataset .metadata_cache .insert_with_key(&tx_key, loaded.clone()) diff --git a/rust/lance/src/io/commit.rs b/rust/lance/src/io/commit.rs index 970c6615c9c..09010d119b7 100644 --- a/rust/lance/src/io/commit.rs +++ b/rust/lance/src/io/commit.rs @@ -71,6 +71,7 @@ mod external_manifest; mod s3_test; /// Read the transaction data from a transaction file. +#[allow(dead_code)] pub(crate) async fn read_transaction_file( object_store: &ObjectStore, base_path: &Path,