Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 59 additions & 19 deletions java/src/test/java/com/lancedb/lance/DatasetTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.ClosedChannelException;
import java.nio.file.Files;
import java.nio.file.Path;
Expand Down Expand Up @@ -371,7 +372,7 @@ void testOpenNonExist(@TempDir Path tempDir) throws IOException, URISyntaxExcept
}

@Test
void testOpenSerializedManifest(@TempDir Path tempDir) throws IOException, URISyntaxException {
void testOpenSerializedManifest(@TempDir Path tempDir) throws IOException {
Path datasetPath = tempDir.resolve("serialized_manifest");
try (BufferAllocator allocator = new RootAllocator()) {
TestUtils.SimpleTestDataset testDataset =
Expand All @@ -380,24 +381,18 @@ void testOpenSerializedManifest(@TempDir Path tempDir) throws IOException, URISy
try (Dataset dataset1 = testDataset.createEmptyDataset()) {
assertEquals(1, dataset1.version());
Path manifestPath = datasetPath.resolve("_versions");
Stream<Path> fileStream = Files.list(manifestPath);
assertEquals(1, fileStream.count());
Path filePath = manifestPath.resolve("1.manifest");
byte[] manifestBytes = Files.readAllBytes(filePath);
// Need to trim the magic number at end and message length at beginning
// https://github.com/lancedb/lance/blob/main/rust/lance-table/src/io/manifest.rs#L95-L96
byte[] trimmedManifest = Arrays.copyOfRange(manifestBytes, 4, manifestBytes.length - 16);
ByteBuffer manifestBuffer = ByteBuffer.allocateDirect(trimmedManifest.length);
manifestBuffer.put(trimmedManifest);
manifestBuffer.flip();
try (Dataset dataset2 = testDataset.write(1, 5)) {
assertEquals(2, dataset2.version());
assertEquals(2, dataset2.latestVersion());
// When reading from the serialized manifest, it shouldn't know about the second dataset
ReadOptions readOptions =
new ReadOptions.Builder().setSerializedManifest(manifestBuffer).build();
Dataset dataset1Manifest = Dataset.open(allocator, datasetPath.toString(), readOptions);
assertEquals(1, dataset1Manifest.version());
try (Stream<Path> fileStream = Files.list(manifestPath)) {
assertEquals(1, fileStream.count());
ByteBuffer manifestBuffer = readManifest(manifestPath.resolve("1.manifest"));
try (Dataset dataset2 = testDataset.write(1, 5)) {
assertEquals(2, dataset2.version());
assertEquals(2, dataset2.latestVersion());
// When reading from the serialized manifest, it shouldn't know about the second dataset
ReadOptions readOptions =
new ReadOptions.Builder().setSerializedManifest(manifestBuffer).build();
Dataset dataset1Manifest = Dataset.open(allocator, datasetPath.toString(), readOptions);
assertEquals(1, dataset1Manifest.version());
}
}
}
}
Expand Down Expand Up @@ -1408,6 +1403,51 @@ void testCompactWithAllOptions(@TempDir Path tempDir) {
}
}

/**
* This method must be aligned with the implementation in <a
* href="https://github.com/lancedb/lance/blob/main/rust/lance-table/src/io/manifest.rs#L95-L96">...</a>
*/
public ByteBuffer readManifest(Path filePath) throws IOException {
byte[] fileBytes = Files.readAllBytes(filePath);
int fileSize = fileBytes.length;

// Basic file size validation
if (fileSize < 16) {
throw new IllegalArgumentException("File too small");
}

// Read the last 16 bytes of the file to get metadata
// Structure: [manifest_pos (8 bytes)][magic (8 bytes)]
ByteBuffer tailBuffer = ByteBuffer.wrap(fileBytes, fileSize - 16, 16);
tailBuffer.order(ByteOrder.LITTLE_ENDIAN);
long manifestPos = tailBuffer.getLong(); // Read manifest start position
// Magic number bytes are read but not validated, we simply skip over them
tailBuffer.getLong(); // This reads and skips the 8-byte magic number

// Remove strict validation since file_size can be larger than manifest_size
// due to index and transaction metadata at the beginning of the file
// Only ensure manifestPos is not negative and doesn't cause overflow
if (manifestPos < 0 || manifestPos >= Integer.MAX_VALUE) {
throw new IllegalArgumentException("Invalid manifest position: " + manifestPos);
}

int manifestStart = (int) manifestPos;

// Verify we have enough data for the length field
if (manifestStart + 4 > fileSize) {
throw new IllegalArgumentException("Manifest position beyond file bounds");
}

// Calculate the actual length of the protobuf data
// The structure is: [4-byte length][protobuf data][8-byte manifest_pos][8-byte magic]
byte[] trimmedManifest =
Arrays.copyOfRange(fileBytes, manifestStart + 4, fileBytes.length - 16);
ByteBuffer manifestBuffer = ByteBuffer.allocateDirect(trimmedManifest.length);
manifestBuffer.put(trimmedManifest);
manifestBuffer.flip();
return manifestBuffer;
}

@Test
void testShallowClone(@TempDir Path tempDir) {
String srcPath = tempDir.resolve("shallow_clone_version_src").toString();
Expand Down
5 changes: 5 additions & 0 deletions protos/table.proto
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ message Manifest {
// and {uuid} is a hyphen-separated UUID.
string transaction_file = 12;

// The file position of the transaction content. None if transaction is empty
// This transaction content begins with the transaction content length as u32
// If the transaction proto message has a length of `len`, the message ends at `len` + 4
optional uint64 transaction_section = 21;

// The next unused row id. If zero, then the table does not have any rows.
//
// This is only used if the "stable_row_ids" feature flag is set.
Expand Down
19 changes: 15 additions & 4 deletions rust/lance-table/src/feature_flags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,17 @@ pub const FLAG_USE_V2_FORMAT_DEPRECATED: u64 = 4;
pub const FLAG_TABLE_CONFIG: u64 = 8;
/// Dataset uses multiple base paths (for shallow clones or multi-base datasets)
pub const FLAG_BASE_PATHS: u64 = 16;
/// Disable writing transaction file under _transaction/, this flag is set when we only want to write inline transaction in manifest
pub const FLAG_DISABLE_TRANSACTION_FILE: u64 = 32;
/// The first bit that is unknown as a feature flag
pub const FLAG_UNKNOWN: u64 = 32;
pub const FLAG_UNKNOWN: u64 = 64;

/// Set the reader and writer feature flags in the manifest based on the contents of the manifest.
pub fn apply_feature_flags(manifest: &mut Manifest, enable_stable_row_id: bool) -> Result<()> {
pub fn apply_feature_flags(
manifest: &mut Manifest,
enable_stable_row_id: bool,
disable_transaction_file: bool,
) -> Result<()> {
// Reset flags
manifest.reader_feature_flags = 0;
manifest.writer_feature_flags = 0;
Expand Down Expand Up @@ -70,6 +76,9 @@ pub fn apply_feature_flags(manifest: &mut Manifest, enable_stable_row_id: bool)
manifest.writer_feature_flags |= FLAG_BASE_PATHS;
}

if disable_transaction_file {
manifest.writer_feature_flags |= FLAG_DISABLE_TRANSACTION_FILE;
}
Ok(())
}

Expand Down Expand Up @@ -98,6 +107,7 @@ mod tests {
assert!(can_read_dataset(super::FLAG_USE_V2_FORMAT_DEPRECATED));
assert!(can_read_dataset(super::FLAG_TABLE_CONFIG));
assert!(can_read_dataset(super::FLAG_BASE_PATHS));
assert!(can_read_dataset(super::FLAG_DISABLE_TRANSACTION_FILE));
assert!(can_read_dataset(
super::FLAG_DELETION_FILES
| super::FLAG_STABLE_ROW_IDS
Expand All @@ -114,6 +124,7 @@ mod tests {
assert!(can_write_dataset(super::FLAG_USE_V2_FORMAT_DEPRECATED));
assert!(can_write_dataset(super::FLAG_TABLE_CONFIG));
assert!(can_write_dataset(super::FLAG_BASE_PATHS));
assert!(can_write_dataset(super::FLAG_DISABLE_TRANSACTION_FILE));
assert!(can_write_dataset(
super::FLAG_DELETION_FILES
| super::FLAG_STABLE_ROW_IDS
Expand Down Expand Up @@ -146,7 +157,7 @@ mod tests {
None,
HashMap::new(), // Empty base_paths
);
apply_feature_flags(&mut normal_manifest, false).unwrap();
apply_feature_flags(&mut normal_manifest, false, false).unwrap();
assert_eq!(normal_manifest.reader_feature_flags & FLAG_BASE_PATHS, 0);
assert_eq!(normal_manifest.writer_feature_flags & FLAG_BASE_PATHS, 0);
// Test 2: Dataset with base_paths (shallow clone or multi-base) should have FLAG_BASE_PATHS
Expand All @@ -167,7 +178,7 @@ mod tests {
None,
base_paths,
);
apply_feature_flags(&mut multi_base_manifest, false).unwrap();
apply_feature_flags(&mut multi_base_manifest, false, false).unwrap();
assert_ne!(
multi_base_manifest.reader_feature_flags & FLAG_BASE_PATHS,
0
Expand Down
2 changes: 2 additions & 0 deletions rust/lance-table/src/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use uuid::Uuid;
mod fragment;
mod index;
mod manifest;
mod transaction;

pub use crate::rowids::version::{
RowDatasetVersionMeta, RowDatasetVersionRun, RowDatasetVersionSequence,
Expand All @@ -19,6 +20,7 @@ pub use manifest::{
is_detached_version, BasePath, DataStorageFormat, Manifest, SelfDescribingFileReader,
WriterVersion, DETACHED_VERSION_MASK,
};
pub use transaction::Transaction;

use lance_core::{Error, Result};

Expand Down
8 changes: 8 additions & 0 deletions rust/lance-table/src/format/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ pub struct Manifest {
/// The path to the transaction file, relative to the root of the dataset
pub transaction_file: Option<String>,

/// The file position of the inline transaction content inside the manifest
pub transaction_section: Option<usize>,

/// Precomputed logic offset of each fragment
/// accelerating the fragment search using offset ranges.
fragment_offsets: Vec<usize>,
Expand Down Expand Up @@ -195,6 +198,7 @@ impl Manifest {
writer_feature_flags: 0,
max_fragment_id: None,
transaction_file: None,
transaction_section: None,
fragment_offsets,
next_row_id: 0,
data_storage_format,
Expand Down Expand Up @@ -231,6 +235,7 @@ impl Manifest {
writer_feature_flags: 0, // These will be set on commit
max_fragment_id: previous.max_fragment_id,
transaction_file: None,
transaction_section: None,
fragment_offsets,
next_row_id: previous.next_row_id,
data_storage_format: previous.data_storage_format.clone(),
Expand Down Expand Up @@ -289,6 +294,7 @@ impl Manifest {
writer_feature_flags: 0, // These will be set on commit
max_fragment_id: self.max_fragment_id,
transaction_file: Some(transaction_file),
transaction_section: None,
fragment_offsets: self.fragment_offsets.clone(),
next_row_id: self.next_row_id,
data_storage_format: self.data_storage_format.clone(),
Expand Down Expand Up @@ -874,6 +880,7 @@ impl TryFrom<pb::Manifest> for Manifest {
} else {
Some(p.transaction_file)
},
transaction_section: p.transaction_section.map(|i| i as usize),
fragment_offsets,
next_row_id: p.next_row_id,
data_storage_format,
Expand Down Expand Up @@ -950,6 +957,7 @@ impl From<&Manifest> for pb::Manifest {
path: base_path.path.clone(),
})
.collect(),
transaction_section: m.transaction_section.map(|i| i as u64),
}
}
}
Expand Down
42 changes: 42 additions & 0 deletions rust/lance-table/src/format/transaction.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

//! Transaction struct for lance-table format layer.
//!
//! This struct is introduced to provide a Struct-first API for passing transaction
//! information within the lance-table crate. It mirrors the protobuf Transaction
//! message at a semantic level while remaining crate-local, so lance-table does
//! not depend on higher layers (e.g., lance crate).
//!
//! Conversion to protobuf occurs at the write boundary. See the From<Transaction>
//! implementation below.

use crate::format::pb;

#[derive(Clone, Debug, PartialEq)]
pub struct Transaction {
/// Crate-local representation backing: protobuf Transaction.
/// Keeping this simple avoids ring dependencies while still enabling
/// Struct-first parameter passing in lance-table.
pub inner: pb::Transaction,
}

impl Transaction {
/// Accessor for testing or internal inspection if needed.
pub fn as_pb(&self) -> &pb::Transaction {
&self.inner
}
}

/// Write-boundary conversion: serialize using protobuf at the last step.
impl From<Transaction> for pb::Transaction {
fn from(tx: Transaction) -> Self {
tx.inner
}
}

impl From<pb::Transaction> for Transaction {
fn from(pb_tx: pb::Transaction) -> Self {
Self { inner: pb_tx }
}
}
Loading